diff options
| author | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
| commit | 68c951e0f440ffd483795dced737e88152c1a581 (patch) | |
| tree | 5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver | |
| parent | Merge main into linux-mac (diff) | |
| parent | Trigger storage scrubbing pass at startup (diff) | |
| download | zen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz zen-68c951e0f440ffd483795dced737e88152c1a581.zip | |
Merged main into linux-mac
Diffstat (limited to 'zenserver')
25 files changed, 1455 insertions, 334 deletions
diff --git a/zenserver/admin/admin.h b/zenserver/admin/admin.h index f90ad4537..3554b1005 100644 --- a/zenserver/admin/admin.h +++ b/zenserver/admin/admin.h @@ -4,6 +4,8 @@ #include <zenhttp/httpserver.h> +namespace zen { + class HttpAdminService : public zen::HttpService { public: @@ -16,3 +18,5 @@ public: private: }; + +} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index cf7deaa93..7f1fe7b44 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -9,6 +9,7 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> +#include <zenstore/CAS.h> #include "structuredcache.h" #include "structuredcachestore.h" @@ -25,17 +26,130 @@ #include <queue> #include <thread> +#include <gsl/gsl-lite.hpp> + namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, - zen::CasStore& InStore, - zen::CidStore& InCidStore, +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +////////////////////////////////////////////////////////////////////////// + +enum class CachePolicy : uint8_t +{ + None = 0, + QueryLocal = 1 << 0, + QueryRemote = 1 << 1, + Query = QueryLocal | QueryRemote, + StoreLocal = 1 << 2, + StoreRemote = 1 << 3, + Store = StoreLocal | StoreRemote, + SkipMeta = 1 << 4, + SkipValue = 1 << 5, + SkipAttachments = 1 << 6, + SkipData = SkipMeta | SkipValue | SkipAttachments, + SkipLocalCopy = 1 << 7, + Local = QueryLocal | StoreLocal, + Remote = QueryRemote | StoreRemote, + Default = Query | Store, + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy +ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) +{ + CachePolicy QueryPolicy = CachePolicy::Query; + + { + std::string_view Opts = QueryParams.GetValue("query"sv); + if (!Opts.empty()) + { + QueryPolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + QueryPolicy |= CachePolicy::QueryLocal; + } + if (Opt == detail::cacheopt::Remote) + { + QueryPolicy |= CachePolicy::QueryRemote; + } + return true; + }); + } + } + + CachePolicy StorePolicy = CachePolicy::Store; + + { + std::string_view Opts = QueryParams.GetValue("store"sv); + if (!Opts.empty()) + { + StorePolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + StorePolicy |= CachePolicy::StoreLocal; + } + if (Opt == detail::cacheopt::Remote) + { + StorePolicy |= CachePolicy::StoreRemote; + } + return true; + }); + } + } + + CachePolicy SkipPolicy = CachePolicy::None; + + { + std::string_view Opts = QueryParams.GetValue("skip"sv); + if (!Opts.empty()) + { + ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Meta) + { + SkipPolicy |= CachePolicy::SkipMeta; + } + if (Opt == detail::cacheopt::Value) + { + SkipPolicy |= CachePolicy::SkipValue; + } + if (Opt == detail::cacheopt::Attachments) + { + SkipPolicy |= CachePolicy::SkipAttachments; + } + if (Opt == detail::cacheopt::Data) + { + SkipPolicy |= CachePolicy::SkipData; + } + return true; + }); + } + } + + return QueryPolicy | StorePolicy | SkipPolicy; +} + +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CasStore& InStore, + CidStore& InCidStore, std::unique_ptr<UpstreamCache> UpstreamCache) -: m_Log(zen::logging::Get("cache")) +: m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_CasStore(InStore) , m_CidStore(InCidStore) @@ -59,8 +173,14 @@ HttpStructuredCacheService::Flush() { } +void +HttpStructuredCacheService::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + void -HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) +HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; @@ -75,28 +195,31 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return HandleCacheBucketRequest(Request, Key); } - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } + const auto QueryParams = Request.GetQueryParams(); + CachePolicy Policy = ParseCachePolicy(QueryParams); + if (Ref.PayloadId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref); + return HandleCacheRecordRequest(Request, Ref, Policy); } else { - return HandleCachePayloadRequest(Request, Ref); + return HandleCachePayloadRequest(Request, Ref, Policy); } return; } void -HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket) +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { ZEN_UNUSED(Request, Bucket); switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -110,22 +233,22 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req if (m_CacheStore.DropBucket(Bucket)) { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } break; } } void -HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -136,7 +259,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; - if (!Success && m_UpstreamCache) + const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage @@ -153,14 +278,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { if (CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { - zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); + CbObjectView CacheRecord(UpstreamResult.Value.Data()); - zen::CbObjectWriter IndexData; + CbObjectWriter IndexData; IndexData.BeginArray("references"); CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); @@ -214,6 +338,18 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (FoundCount == AttachmentCount) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + + if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + { + CbPackage PackageWithoutAttachments; + PackageWithoutAttachments.SetObject(CacheRecord); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + PackageWithoutAttachments.Save(Writer); + + Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + } } else { @@ -236,7 +372,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } if (Verb == kHead) @@ -248,41 +384,45 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { CbObjectView CacheRecord(Value.Value.Data()); - const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); } - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - uint64_t AttachmentBytes = 0ull; + const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; CbPackage Package; - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - FoundCount++; - } - AttachmentCount++; - }); - - if (FoundCount != AttachmentCount) + if (!SkipAttachments) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - FoundCount, - AttachmentCount); + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } } Package.SetObject(LoadCompactBinaryObject(Value.Value)); @@ -300,7 +440,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else { @@ -310,41 +450,43 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req NiceBytes(Value.Value.Size()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } break; case kPut: { - zen::IoBuffer Body = Request.ReadPayload(); + IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (m_UpstreamCache) + if (StoreUpstream) { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); } - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); + const CbValidateError ValidationResult = + ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { @@ -360,7 +502,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + CbObjectView Cbo(Body.Data()); std::vector<IoHash> References; std::vector<IoHash> MissingRefs; @@ -371,7 +513,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (!References.empty()) { - zen::CbObjectWriter Idx; + CbObjectWriter Idx; Idx.BeginArray("references"); for (const IoHash& Hash : References) @@ -393,26 +535,23 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(CacheValue.Value.Size()), + NiceBytes(CacheValue.Value.Size()), MissingRefs.size(), References.size()); - if (MissingRefs.empty()) + if (MissingRefs.empty() && StoreUpstream) { - // Only enqueue valid cache records, i.e. all referenced payloads exists - if (m_UpstreamCache) - { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); - } + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { // TODO: Binary attachments? - zen::CbObjectWriter Response; + CbObjectWriter Response; Response.BeginArray("needs"); for (const IoHash& MissingRef : MissingRefs) { @@ -422,7 +561,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Response.EndArray(); // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(HttpResponseCode::Created, Response.Save()); } } else if (ContentType == HttpContentType::kCbPackage) @@ -437,26 +576,22 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req CbObject CacheRecord = Package.GetObject(); - int32_t AttachmentCount = 0; - int32_t NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - bool AttachmentsOk = true; - + struct AttachmentInsertResult + { + int32_t Count = 0; + int32_t NewCount = 0; + uint64_t Bytes = 0; + uint64_t NewBytes = 0; + bool Ok = false; + }; + + AttachmentInsertResult AttachmentResult{.Ok = true}; std::span<const CbAttachment> Attachments = Package.GetAttachments(); + std::vector<IoHash> PayloadIds; - std::vector<IoHash> PayloadIds; PayloadIds.reserve(Attachments.size()); - CacheRecord.IterateAttachments([this, - &Ref, - &Package, - &AttachmentsOk, - &AttachmentCount, - &TotalAttachmentBytes, - &TotalNewBytes, - &NewAttachmentCount, - &PayloadIds](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) { if (Attachment->IsCompressedBinary()) @@ -469,12 +604,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (InsertResult.New) { - TotalNewBytes += ChunkSize; - ++NewAttachmentCount; + AttachmentResult.NewBytes += ChunkSize; + AttachmentResult.NewCount++; } - TotalAttachmentBytes += ChunkSize; - AttachmentCount++; + AttachmentResult.Bytes += ChunkSize; + AttachmentResult.Count++; } else { @@ -482,7 +617,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } } else @@ -491,23 +626,24 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } }); - if (!AttachmentsOk) + if (!AttachmentResult.Ok) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); } IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); - const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size(); ZenCacheValue CacheValue{.Value = CacheRecordChunk}; m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - if (m_UpstreamCache) + if (StoreUpstream) { + ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(PayloadIds)}); @@ -516,17 +652,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(TotalPackageBytes), - NewAttachmentCount, - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - zen::NiceBytes(TotalAttachmentBytes)); + NiceBytes(TotalPackageBytes), + AttachmentResult.NewCount, + AttachmentResult.Count, + NiceBytes(AttachmentResult.NewBytes), + NiceBytes(AttachmentResult.Bytes)); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } } break; @@ -540,7 +676,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash @@ -548,27 +684,29 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash + ZEN_UNUSED(Policy); + switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: { - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; if (!Payload && m_UpstreamCache) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { - if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload); - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } @@ -582,7 +720,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (!Payload) { ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", @@ -598,29 +736,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re Request.SetSuppressResponseBody(); } - return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Payload); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) + if (IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, - HttpContentType::kText, - "Empty payload not permitted"); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); } - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Body); + IoHash ChunkHash = IoHash::HashBuffer(Body); - zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"); } @@ -632,7 +768,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(HttpResponseCode::BadRequest); } - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); @@ -646,11 +782,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (Result.New) { - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } } } @@ -666,7 +802,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re } bool -HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef) +HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); @@ -702,14 +838,14 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach PayloadSegment = Key.substr(PayloadSplitOffset + 1); } - if (HashSegment.size() != zen::IoHash::StringLength) + if (HashSegment.size() != IoHash::StringLength) { return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) + if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) { - const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { @@ -718,10 +854,10 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach } else { - OutRef.PayloadId = zen::IoHash::Zero; + OutRef.PayloadId = IoHash::Zero; } - const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); + const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 8289fd700..bd163dd1d 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -10,13 +10,13 @@ namespace spdlog { class logger; } -class ZenCacheStore; - namespace zen { class CasStore; class CidStore; class UpstreamCache; +class ZenCacheStore; +enum class CachePolicy : uint8_t; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -60,6 +60,7 @@ public: virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -70,13 +71,13 @@ private: }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - ZenCacheStore& m_CacheStore; + zen::ZenCacheStore& m_CacheStore; zen::CasStore& m_CasStore; zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 018955e65..502ca6605 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -24,15 +24,16 @@ #include <atlfile.h> -using namespace zen; -using namespace fmt::literals; - ////////////////////////////////////////////////////////////////////////// -ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +namespace zen { + +using namespace fmt::literals; + +ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); - zen::CreateDirectories(RootDir); + CreateDirectories(RootDir); } ZenCacheStore::~ZenCacheStore() @@ -40,7 +41,7 @@ ZenCacheStore::~ZenCacheStore() } bool -ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); @@ -68,7 +69,7 @@ ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCac } void -ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { // Store value and index @@ -104,6 +105,12 @@ ZenCacheStore::Flush() m_DiskLayer.Flush(); } +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + m_DiskLayer.Scrub(Ctx); + m_MemLayer.Scrub(Ctx); +} ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -115,7 +122,7 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() } bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; @@ -139,7 +146,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, } void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; @@ -178,8 +185,14 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) return !!m_Buckets.erase(std::string(Bucket)); } +void +ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + bool -ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_bucketLock); @@ -196,7 +209,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& } void -ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); @@ -227,7 +240,7 @@ struct DiskLocation struct DiskIndexEntry { - zen::IoHash Key; + IoHash Key; DiskLocation Location; }; @@ -243,8 +256,8 @@ struct ZenCacheDiskLayer::CacheBucket void OpenOrCreate(std::filesystem::path BucketDir); static bool Delete(std::filesystem::path BucketDir); - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); @@ -260,12 +273,12 @@ private: BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; - void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); - void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); - RwLock m_IndexLock; - tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; + RwLock m_IndexLock; + tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) @@ -281,7 +294,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) { if (std::filesystem::exists(BucketDir)) { - zen::DeleteDirectories(BucketDir); + DeleteDirectories(BucketDir); return true; } @@ -292,7 +305,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { - zen::CreateDirectories(BucketDir); + CreateDirectories(BucketDir); m_BucketDir = BucketDir; @@ -357,7 +370,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) uint64_t MaxFileOffset = 0; - if (zen::RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) + if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) { m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; @@ -372,25 +385,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey) { - char hex[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir.c_str()); + Path.Append(L"/blob/"); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.Append(L"/"); + Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.Append(L"/"); - Path.AppendAsciiRange(hex, hex + sizeof(hex)); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool -ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_Ok) { return false; } - zen::RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope _(m_IndexLock); if (auto it = m_Index.find(HashKey); it != m_Index.end()) { @@ -417,7 +434,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) { OutValue.Value = Data; OutValue.Value.SetContentType(ContentType); @@ -431,7 +448,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O } void -ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (!m_Ok) { @@ -453,12 +470,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheVa EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; - m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -483,7 +500,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_SobsFile.Close(); m_SlogFile.Close(); - zen::DeleteDirectories(m_BucketDir); + DeleteDirectories(m_BucketDir); } void @@ -494,12 +511,22 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { - zen::WideStringBuilder<128> DataFilePath; + ZEN_UNUSED(Ctx); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) +{ + WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - // TODO: replace this with a more efficient implementation with proper atomic rename + // TODO: replace this process with a more efficient implementation with proper atomic rename + // and also avoid creating directories if we can + + std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); + CreateDirectories(ParentPath); CAtlTemporaryFile DataFile; @@ -507,21 +534,23 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); } hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); } + // Move file into place (note: not fully atomic!) + hRes = DataFile.Close(DataFilePath.c_str()); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); } // Update index @@ -533,7 +562,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; @@ -560,12 +589,12 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -579,7 +608,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze { // Bucket needs to be opened/created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -603,12 +632,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze } void -ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -622,7 +651,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co { // New bucket needs to be created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -651,7 +680,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -679,7 +708,7 @@ ZenCacheDiskLayer::Flush() Buckets.reserve(m_Buckets.size()); { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { @@ -705,7 +734,7 @@ ZenCacheTracker::~ZenCacheTracker() } void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) { ZEN_UNUSED(Bucket); ZEN_UNUSED(HashKey); @@ -715,3 +744,5 @@ void ZenCacheTracker::Flush() { } + +} // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 48c3cfde9..fdf4a8cfe 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -23,8 +23,6 @@ namespace zen { class WideStringBuilderBase; class CasStore; -} // namespace zen - /****************************************************************************** /$$$$$$$$ /$$$$$$ /$$ @@ -44,8 +42,8 @@ class CasStore; struct ZenCacheValue { - zen::IoBuffer Value; - zen::CbObject IndexData; + IoBuffer Value; + CbObject IndexData; }; class ZenCacheMemoryLayer @@ -54,34 +52,36 @@ public: ZenCacheMemoryLayer(); ~ZenCacheMemoryLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); + void Scrub(ScrubContext& Ctx); private: struct CacheBucket { - zen::RwLock m_bucketLock; - tsl::robin_map<zen::IoHash, zen::IoBuffer> m_cacheMap; + RwLock m_bucketLock; + tsl::robin_map<IoHash, IoBuffer> m_cacheMap; - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); }; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; }; class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: /** A cache bucket manages a single directory containing @@ -89,22 +89,23 @@ private: */ struct CacheBucket; - zen::CasStore& m_CasStore; + CasStore& m_CasStore; std::filesystem::path m_RootDir; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive }; class ZenCacheStore { public: - ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: std::filesystem::path m_RootDir; @@ -121,8 +122,10 @@ public: ZenCacheTracker(ZenCacheStore& CacheStore); ~ZenCacheTracker(); - void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey); + void TrackAccess(std::string_view Bucket, const IoHash& HashKey); void Flush(); private: }; + +} // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 578a3a202..164d2a792 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -55,6 +55,27 @@ PickDefaultStateDirectory() #endif +UpstreamCachePolicy +ParseUpstreamCachePolicy(std::string_view Options) +{ + if (Options == "readonly") + { + return UpstreamCachePolicy::Read; + } + else if (Options == "writeonly") + { + return UpstreamCachePolicy::Write; + } + else if (Options == "disabled") + { + return UpstreamCachePolicy::Disabled; + } + else + { + return UpstreamCachePolicy::ReadWrite; + } +} + void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) { @@ -77,6 +98,21 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<std::string>(GlobalOptions.ChildId), "<identifier>"); +#if ZEN_PLATFORM_WINDOWS + options.add_option("lifetime", + "", + "install", + "Install zenserver as a Windows service", + cxxopts::value<bool>(GlobalOptions.InstallService), + ""); + options.add_option("lifetime", + "", + "uninstall", + "Uninstall zenserver as a Windows service", + cxxopts::value<bool>(GlobalOptions.UninstallService), + ""); +#endif + options.add_option("network", "p", "port", @@ -98,6 +134,14 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"), ""); + std::string UpstreamCachePolicyOptions; + options.add_option("cache", + "", + "upstream-cache-policy", + "", + cxxopts::value<std::string>(UpstreamCachePolicyOptions)->default_value(""), + "Upstream cache policy (readwrite|readonly|writeonly|disabled)"); + options.add_option("cache", "", "upstream-jupiter-url", @@ -163,13 +207,6 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", - "upstream-enabled", - "Whether upstream caching is disabled", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"), - ""); - - options.add_option("cache", - "", "upstream-thread-count", "Number of threads used for upstream procsssing", cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), @@ -185,6 +222,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z exit(0); } + + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions); } catch (cxxopts::OptionParseException& e) { @@ -261,7 +300,8 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream")) { - ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled); + std::string Policy = UpstreamConfig->get_or("policy", std::string()); + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(Policy); ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4); if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter")) diff --git a/zenserver/config.h b/zenserver/config.h index 80ec86905..6ade1b401 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -9,12 +9,14 @@ struct ZenServerOptions { bool IsDebug = false; bool IsTest = false; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - int BasePort = 1337; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - std::string ChildId; // Id assigned by parent process (used for lifetime management) - std::string LogId; // Id for tagging log output - std::filesystem::path DataDir; // Root directory for state (used for testing) + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + int BasePort = 1337; // Service listen port (used for both UDP and TCP) + int OwnerPid = 0; // Parent process id (zero for standalone) + std::string ChildId; // Id assigned by parent process (used for lifetime management) + bool InstallService = false; // Flag used to initiate service install (temporary) + bool UninstallService = false; // Flag used to initiate service uninstall (temporary) + std::string LogId; // Id for tagging log output + std::filesystem::path DataDir; // Root directory for state (used for testing) }; struct ZenUpstreamJupiterConfig @@ -34,12 +36,20 @@ struct ZenUpstreamZenConfig std::string Url; }; +enum class UpstreamCachePolicy : uint8_t +{ + Disabled = 0, + Read = 1 << 0, + Write = 1 << 1, + ReadWrite = Read | Write +}; + struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; ZenUpstreamZenConfig ZenConfig; int UpstreamThreadCount = 4; - bool Enabled = false; + UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; }; struct ZenServiceConfig diff --git a/zenserver/diag/diagsvcs.h b/zenserver/diag/diagsvcs.h index 51ee98f67..61703e393 100644 --- a/zenserver/diag/diagsvcs.h +++ b/zenserver/diag/diagsvcs.h @@ -7,7 +7,9 @@ ////////////////////////////////////////////////////////////////////////// -class HttpTestService : public zen::HttpService +namespace zen { + +class HttpTestService : public HttpService { uint32_t LogPoint = 0; @@ -17,7 +19,7 @@ public: virtual const char* BaseUri() const override { return "/test/"; } - virtual void HandleRequest(zen::HttpServerRequest& Request) override + virtual void HandleRequest(HttpServerRequest& Request) override { using namespace std::literals; @@ -25,21 +27,21 @@ public: if (Uri == "hello"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"hello world!"sv); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"hello world!"sv); // OutputLogMessageInternal(&LogPoint, 0, 0); } else if (Uri == "1K"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1k); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1k); } else if (Uri == "1M"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1m); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1m); } else if (Uri == "1M_1k"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024); for (int i = 0; i < 1024; ++i) @@ -47,11 +49,11 @@ public: Buffers.push_back(m_1k); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } else if (Uri == "1G"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024); for (int i = 0; i < 1024; ++i) @@ -59,11 +61,11 @@ public: Buffers.push_back(m_1m); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } else if (Uri == "1G_1k"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024 * 1024); for (int i = 0; i < 1024 * 1024; ++i) @@ -71,16 +73,16 @@ public: Buffers.push_back(m_1k); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } } private: - zen::IoBuffer m_1m{1024 * 1024}; - zen::IoBuffer m_1k{m_1m, 0u, 1024}; + IoBuffer m_1m{1024 * 1024}; + IoBuffer m_1k{m_1m, 0u, 1024}; }; -class HttpHealthService : public zen::HttpService +class HttpHealthService : public HttpService { public: HttpHealthService() = default; @@ -88,16 +90,18 @@ public: virtual const char* BaseUri() const override { return "/health/"; } - virtual void HandleRequest(zen::HttpServerRequest& Request) override + virtual void HandleRequest(HttpServerRequest& Request) override { using namespace std::literals; switch (Request.RequestVerb()) { - case zen::HttpVerb::kGet: - return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"OK!"sv); + case HttpVerb::kGet: + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv); } } private: }; + +} // namespace zen diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 48eda7512..41b140f90 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -9,6 +9,9 @@ #include <spdlog/pattern_formatter.h> #include <spdlog/sinks/ansicolor_sink.h> #include <spdlog/sinks/basic_file_sink.h> +#include <spdlog/sinks/daily_file_sink.h> +#include <spdlog/sinks/msvc_sink.h> +#include <spdlog/sinks/rotating_file_sink.h> #include <spdlog/sinks/stdout_color_sinks.h> #include <spdlog/spdlog.h> #include <zencore/string.h> @@ -204,6 +207,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) IsAsync = false; } + if (GlobalOptions.IsTest) + { + LogLevel = spdlog::level::trace; + IsAsync = false; + } + if (IsAsync) { const int QueueSize = 8192; @@ -217,7 +226,19 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) // Sinks auto ConsoleSink = std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>(); - auto FileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true); + +#if 0 + auto FileSink = std::make_shared<spdlog::sinks::daily_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), + 0, + 0, + /* truncate */ false, + uint16_t(/* max files */ 14)); +#else + auto FileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), + /* max size */ 128 * 1024 * 1024, + /* max files */ 16, + /* rotate on open */ true); +#endif // Default @@ -228,20 +249,30 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) Sinks.push_back(ConsoleSink); Sinks.push_back(FileSink); +#if ZEN_PLATFORM_WINDOWS + if (zen::IsDebuggerPresent()) + { + auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>(); + DebugSink->set_level(spdlog::level::debug); + Sinks.push_back(DebugSink); + } +#endif + // Jupiter - only log HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); spdlog::register_logger(JupiterLogger); - JupiterLogger->set_level(LogLevel); + + // Zen - only log HTTP traffic to file auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); spdlog::register_logger(ZenClientLogger); - ZenClientLogger->set_level(LogLevel); // Configure all registered loggers according to settings spdlog::set_level(LogLevel); spdlog::flush_on(spdlog::level::err); + spdlog::flush_every(std::chrono::seconds{2}); spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now())); } diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp index ab83b8a1c..1e765fbe5 100644 --- a/zenserver/experimental/usnjournal.cpp +++ b/zenserver/experimental/usnjournal.cpp @@ -34,14 +34,14 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - zen::ThrowSystemException("GetVolumePathName failed"); + zen::ThrowLastError("GetVolumePathName failed"); } Success = GetVolumeNameForVolumeMountPoint(VolumePathName, VolumeName, ZEN_ARRAY_COUNT(VolumeName)); if (!Success) { - zen::ThrowSystemException("GetVolumeNameForVolumeMountPoint failed"); + zen::ThrowLastError("GetVolumeNameForVolumeMountPoint failed"); } // Chop off trailing slash since we want to open a volume handle, not a handle to the volume root directory @@ -64,7 +64,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (m_VolumeHandle == INVALID_HANDLE_VALUE) { - ThrowSystemException("Volume handle open failed"); + ThrowLastError("Volume handle open failed"); } // Figure out which file system is in use for volume @@ -86,7 +86,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - ThrowSystemException("Failed to get volume information"); + ThrowLastError("Failed to get volume information"); } ZEN_DEBUG("File system type is {}", WideToUtf8(FileSystemName)); @@ -173,7 +173,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - ThrowSystemException("GetFileInformationByHandleEx failed"); + ThrowLastError("GetFileInformationByHandleEx failed"); } const Frn VolumeRootFrn = FileInformation.FileId; diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 404484edf..1f4239b23 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -780,6 +780,12 @@ ProjectStore::Project::Flush() // TODO } +void +ProjectStore::Project::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) @@ -815,6 +821,17 @@ ProjectStore::Flush() } } +void +ProjectStore::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Kv.second.Scrub(Ctx); + } +} + ProjectStore::Project* ProjectStore::OpenProject(std::string_view ProjectId) { diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 3d2247305..e545d78b9 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -101,6 +101,7 @@ public: spdlog::logger& Log() { return m_OuterProject->Log(); } void Flush(); + void Scrub(ScrubContext& Ctx); std::size_t OplogCount() const { return m_LatestOpMap.size(); } @@ -154,6 +155,7 @@ public: void Write(); [[nodiscard]] static bool Exists(std::filesystem::path BasePath); void Flush(); + void Scrub(ScrubContext& Ctx); spdlog::logger& Log(); private: @@ -177,6 +179,7 @@ public: void DeleteProject(std::string_view ProjectId); bool Exists(std::string_view ProjectId); void Flush(); + void Scrub(ScrubContext& Ctx); spdlog::logger& Log() { return m_Log; } const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } @@ -193,13 +196,13 @@ private: ////////////////////////////////////////////////////////////////////////// // -// {ns} a root namespace, should be associated with the project which owns it +// {project} a project identifier // {target} a variation of the project, typically a build target // {lsn} oplog entry sequence number // -// /prj/{ns} -// /prj/{ns}/oplog/{target} -// /prj/{ns}/oplog/{target}/{lsn} +// /prj/{project} +// /prj/{project}/oplog/{target} +// /prj/{project}/oplog/{target}/{lsn} // // oplog entry // diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 4a5467648..2e74602db 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -66,6 +66,14 @@ CloudCacheSession::~CloudCacheSession() } CloudCacheResult +CloudCacheSession::Authenticate() +{ + std::string Auth; + const bool Success = m_CacheClient->AcquireAccessToken(Auth); + return {.Success = Success}; +} + +CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { std::string Auth; @@ -163,7 +171,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -194,7 +204,9 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -215,7 +227,9 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } std::vector<IoHash> diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 5535ba000..21217387c 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -61,6 +61,7 @@ public: CloudCacheSession(CloudCacheClient* OuterClient); ~CloudCacheSession(); + CloudCacheResult Authenticate(); CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 38d30a795..d6b6d44be 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -94,7 +94,7 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; - class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint + class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) @@ -108,8 +108,9 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Jupiter client connection - return !m_Client->ServiceUrl().empty(); + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + return Result.Success; } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -118,8 +119,8 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); - CloudCacheResult Result; + CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { @@ -134,7 +135,7 @@ namespace detail { { CbPackage Package; - const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); @@ -183,7 +184,7 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); return {.Value = Result.Response, @@ -278,7 +279,7 @@ namespace detail { RefPtr<CloudCacheClient> m_Client; }; - class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint + class ZenUpstreamEndpoint final : public UpstreamEndpoint { public: ZenUpstreamEndpoint(std::string_view ServiceUrl) @@ -292,8 +293,20 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Zen client connection - return !m_Client->ServiceUrl().empty(); + try + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) + { + Result = Session.SayHello(); + } + return Result.Success; + } + catch (std::exception&) + { + return false; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -344,14 +357,14 @@ namespace detail { try { - zen::ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result; - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { - zen::CbPackage Package; + CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Payload : Payloads) @@ -427,8 +440,8 @@ namespace detail { } private: - std::string m_DisplayName; - RefPtr<zen::ZenStructuredCacheClient> m_Client; + std::string m_DisplayName; + RefPtr<ZenStructuredCacheClient> m_Client; }; } // namespace detail @@ -455,7 +468,7 @@ class UpstreamStats final }; public: - UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} + UpstreamStats() : m_Log(logging::Get("upstream")) {} void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) { @@ -523,8 +536,8 @@ private: class DefaultUpstreamCache final : public UpstreamCache { public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) @@ -559,12 +572,15 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -573,12 +589,15 @@ public: virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -587,7 +606,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load()) + if (m_IsRunning.load() && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -697,21 +716,21 @@ private: spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - UpstreamStats m_Stats; - std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; + std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamCache> -MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 327778452..142fe260f 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -8,11 +8,10 @@ #include <memory> -class ZenCacheStore; - namespace zen { class CidStore; +class ZenCacheStore; struct CloudCacheClientOptions; struct UpstreamCacheKey @@ -36,7 +35,9 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; }; struct GetUpstreamCacheResult @@ -101,7 +102,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; }; -std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); +std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 55ddd310f..7f689d7f3 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -73,7 +73,7 @@ namespace detail { // Note that currently this just implements an UDP echo service for testing purposes -Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId()) +Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId()) { } @@ -370,7 +370,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(zen::logging::Get("zenclient"sv)) +: m_Log(logging::Get("zenclient"sv)) , m_Client(OuterClient) { m_SessionState = m_Client.AllocSessionState(); @@ -382,6 +382,19 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession() } ZenCacheResult +ZenStructuredCacheSession::SayHello() +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/test/hello"; + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + cpr::Response Response = Session.Get(); + + return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index ff4a551bf..36cfd1217 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -109,6 +109,7 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); + ZenCacheResult SayHello(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); diff --git a/zenserver/vfs.cpp b/zenserver/vfs.cpp index 86e265b20..fcc9a71f8 100644 --- a/zenserver/vfs.cpp +++ b/zenserver/vfs.cpp @@ -5,7 +5,6 @@ #if ZEN_WITH_VFS # include <zencore/except.h> # include <zencore/filesystem.h> -# include <zencore/snapshot_manifest.h> # include <zencore/stream.h> # include <zencore/windows.h> # include <zencore/logging.h> @@ -532,7 +531,7 @@ retry: } else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) { - throw zen::WindowsException(hRes, "Failed to initialize root placeholder"); + ThrowSystemException(hRes, "Failed to initialize root placeholder"); } // Ignore error, problems will be reported below anyway diff --git a/zenserver/windows/service.cpp b/zenserver/windows/service.cpp new file mode 100644 index 000000000..017b5f9a7 --- /dev/null +++ b/zenserver/windows/service.cpp @@ -0,0 +1,631 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "service.h" + +#include <zencore/zencore.h> + +#include <stdio.h> +#include <tchar.h> +#include <zencore/windows.h> + +#define SVCNAME L"Zen Store" + +SERVICE_STATUS gSvcStatus; +SERVICE_STATUS_HANDLE gSvcStatusHandle; +HANDLE ghSvcStopEvent = NULL; + +void SvcInstall(void); + +void ReportSvcStatus(DWORD, DWORD, DWORD); +void SvcReportEvent(LPTSTR); + +WindowsService::WindowsService() +{ +} + +WindowsService::~WindowsService() +{ +} + +// +// Purpose: +// Installs a service in the SCM database +// +// Parameters: +// None +// +// Return value: +// None +// +VOID +WindowsService::Install() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + TCHAR szPath[MAX_PATH]; + + if (!GetModuleFileName(NULL, szPath, MAX_PATH)) + { + printf("Cannot install service (%d)\n", GetLastError()); + return; + } + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Create the service + + schService = CreateService(schSCManager, // SCM database + SVCNAME, // name of service + SVCNAME, // service name to display + SERVICE_ALL_ACCESS, // desired access + SERVICE_WIN32_OWN_PROCESS, // service type + SERVICE_DEMAND_START, // start type + SERVICE_ERROR_NORMAL, // error control type + szPath, // path to service's binary + NULL, // no load ordering group + NULL, // no tag identifier + NULL, // no dependencies + NULL, // LocalSystem account + NULL); // no password + + if (schService == NULL) + { + printf("CreateService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + else + printf("Service installed successfully\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +void +WindowsService::Delete() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + DELETE); // need delete access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Delete the service. + + if (!DeleteService(schService)) + { + printf("DeleteService failed (%d)\n", GetLastError()); + } + else + printf("Service deleted successfully\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +WindowsService* gSvc; + +void WINAPI +CallMain(DWORD, LPSTR*) +{ + gSvc->SvcMain(); +} + +int +WindowsService::ServiceMain() +{ + if (zen::IsInteractiveSession()) + { + // Not actually running as a service + return Run(); + } + else + { + gSvc = this; + + SERVICE_TABLE_ENTRY DispatchTable[] = {{(LPWSTR)SVCNAME, (LPSERVICE_MAIN_FUNCTION)&CallMain}, {NULL, NULL}}; + + // This call returns when the service has stopped. + // The process should simply terminate when the call returns. + + if (!StartServiceCtrlDispatcher(DispatchTable)) + { + SvcReportEvent((LPTSTR)L"StartServiceCtrlDispatcher"); + } + } + + return 0; +} + +int +WindowsService::SvcMain() +{ + // Register the handler function for the service + + gSvcStatusHandle = RegisterServiceCtrlHandler(SVCNAME, SvcCtrlHandler); + + if (!gSvcStatusHandle) + { + SvcReportEvent((LPTSTR)TEXT("RegisterServiceCtrlHandler")); + + return 1; + } + + // These SERVICE_STATUS members remain as set here + + gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + gSvcStatus.dwServiceSpecificExitCode = 0; + + // Report initial status to the SCM + + ReportSvcStatus(SERVICE_START_PENDING, NO_ERROR, 3000); + + // Create an event. The control handler function, SvcCtrlHandler, + // signals this event when it receives the stop control code. + + ghSvcStopEvent = CreateEvent(NULL, // default security attributes + TRUE, // manual reset event + FALSE, // not signaled + NULL); // no name + + if (ghSvcStopEvent == NULL) + { + ReportSvcStatus(SERVICE_STOPPED, GetLastError(), 0); + + return 1; + } + + // Report running status when initialization is complete. + + ReportSvcStatus(SERVICE_RUNNING, NO_ERROR, 0); + + int ReturnCode = Run(); + + ReportSvcStatus(SERVICE_STOPPED, NO_ERROR, 0); + + return ReturnCode; +} + +// +// Purpose: +// Retrieves and displays the current service configuration. +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoQuerySvc() +{ + SC_HANDLE schSCManager{}; + SC_HANDLE schService{}; + LPQUERY_SERVICE_CONFIG lpsc{}; + LPSERVICE_DESCRIPTION lpsd{}; + DWORD dwBytesNeeded{}, cbBufSize{}, dwError{}; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_QUERY_CONFIG); // need query config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Get the configuration information. + + if (!QueryServiceConfig(schService, NULL, 0, &dwBytesNeeded)) + { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER == dwError) + { + cbBufSize = dwBytesNeeded; + lpsc = (LPQUERY_SERVICE_CONFIG)LocalAlloc(LMEM_FIXED, cbBufSize); + } + else + { + printf("QueryServiceConfig failed (%d)", dwError); + goto cleanup; + } + } + + if (!QueryServiceConfig(schService, lpsc, cbBufSize, &dwBytesNeeded)) + { + printf("QueryServiceConfig failed (%d)", GetLastError()); + goto cleanup; + } + + if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, NULL, 0, &dwBytesNeeded)) + { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER == dwError) + { + cbBufSize = dwBytesNeeded; + lpsd = (LPSERVICE_DESCRIPTION)LocalAlloc(LMEM_FIXED, cbBufSize); + } + else + { + printf("QueryServiceConfig2 failed (%d)", dwError); + goto cleanup; + } + } + + if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, (LPBYTE)lpsd, cbBufSize, &dwBytesNeeded)) + { + printf("QueryServiceConfig2 failed (%d)", GetLastError()); + goto cleanup; + } + + // Print the configuration information. + + _tprintf(TEXT("%s configuration: \n"), SVCNAME); + _tprintf(TEXT(" Type: 0x%x\n"), lpsc->dwServiceType); + _tprintf(TEXT(" Start Type: 0x%x\n"), lpsc->dwStartType); + _tprintf(TEXT(" Error Control: 0x%x\n"), lpsc->dwErrorControl); + _tprintf(TEXT(" Binary path: %s\n"), lpsc->lpBinaryPathName); + _tprintf(TEXT(" Account: %s\n"), lpsc->lpServiceStartName); + + if (lpsd->lpDescription != NULL && lstrcmp(lpsd->lpDescription, TEXT("")) != 0) + _tprintf(TEXT(" Description: %s\n"), lpsd->lpDescription); + if (lpsc->lpLoadOrderGroup != NULL && lstrcmp(lpsc->lpLoadOrderGroup, TEXT("")) != 0) + _tprintf(TEXT(" Load order group: %s\n"), lpsc->lpLoadOrderGroup); + if (lpsc->dwTagId != 0) + _tprintf(TEXT(" Tag ID: %d\n"), lpsc->dwTagId); + if (lpsc->lpDependencies != NULL && lstrcmp(lpsc->lpDependencies, TEXT("")) != 0) + _tprintf(TEXT(" Dependencies: %s\n"), lpsc->lpDependencies); + + LocalFree(lpsc); + LocalFree(lpsd); + +cleanup: + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Disables the service. +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoDisableSvc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service start type. + + if (!ChangeServiceConfig(schService, // handle of service + SERVICE_NO_CHANGE, // service type: no change + SERVICE_DISABLED, // service start type + SERVICE_NO_CHANGE, // error control: no change + NULL, // binary path: no change + NULL, // load order group: no change + NULL, // tag ID: no change + NULL, // dependencies: no change + NULL, // account name: no change + NULL, // password: no change + NULL)) // display name: no change + { + printf("ChangeServiceConfig failed (%d)\n", GetLastError()); + } + else + printf("Service disabled successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Enables the service. +// +// Parameters: +// None +// +// Return value: +// None +// +VOID __stdcall DoEnableSvc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service start type. + + if (!ChangeServiceConfig(schService, // handle of service + SERVICE_NO_CHANGE, // service type: no change + SERVICE_DEMAND_START, // service start type + SERVICE_NO_CHANGE, // error control: no change + NULL, // binary path: no change + NULL, // load order group: no change + NULL, // tag ID: no change + NULL, // dependencies: no change + NULL, // account name: no change + NULL, // password: no change + NULL)) // display name: no change + { + printf("ChangeServiceConfig failed (%d)\n", GetLastError()); + } + else + printf("Service enabled successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} +// +// Purpose: +// Updates the service description to "This is a test description". +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoUpdateSvcDesc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + SERVICE_DESCRIPTION sd; + TCHAR szDesc[] = TEXT("This is a test description"); + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service description. + + sd.lpDescription = szDesc; + + if (!ChangeServiceConfig2(schService, // handle to service + SERVICE_CONFIG_DESCRIPTION, // change: description + &sd)) // new description + { + printf("ChangeServiceConfig2 failed\n"); + } + else + printf("Service description updated successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Sets the current service status and reports it to the SCM. +// +// Parameters: +// dwCurrentState - The current state (see SERVICE_STATUS) +// dwWin32ExitCode - The system error code +// dwWaitHint - Estimated time for pending operation, +// in milliseconds +// +// Return value: +// None +// +VOID +ReportSvcStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint) +{ + static DWORD dwCheckPoint = 1; + + // Fill in the SERVICE_STATUS structure. + + gSvcStatus.dwCurrentState = dwCurrentState; + gSvcStatus.dwWin32ExitCode = dwWin32ExitCode; + gSvcStatus.dwWaitHint = dwWaitHint; + + if (dwCurrentState == SERVICE_START_PENDING) + gSvcStatus.dwControlsAccepted = 0; + else + gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP; + + if ((dwCurrentState == SERVICE_RUNNING) || (dwCurrentState == SERVICE_STOPPED)) + gSvcStatus.dwCheckPoint = 0; + else + gSvcStatus.dwCheckPoint = dwCheckPoint++; + + // Report the status of the service to the SCM. + SetServiceStatus(gSvcStatusHandle, &gSvcStatus); +} + +void +WindowsService::SvcCtrlHandler(DWORD dwCtrl) +{ + // Handle the requested control code. + // + // Called by SCM whenever a control code is sent to the service + // using the ControlService function. + + switch (dwCtrl) + { + case SERVICE_CONTROL_STOP: + ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0); + + // Signal the service to stop. + + SetEvent(ghSvcStopEvent); + zen::RequestApplicationExit(0); + + ReportSvcStatus(gSvcStatus.dwCurrentState, NO_ERROR, 0); + return; + + case SERVICE_CONTROL_INTERROGATE: + break; + + default: + break; + } +} + +// +// Purpose: +// Logs messages to the event log +// +// Parameters: +// szFunction - name of function that failed +// +// Return value: +// None +// +// Remarks: +// The service must have an entry in the Application event log. +// +VOID +SvcReportEvent(LPTSTR szFunction) +{ + ZEN_UNUSED(szFunction); + + // HANDLE hEventSource; + // LPCTSTR lpszStrings[2]; + // TCHAR Buffer[80]; + + // hEventSource = RegisterEventSource(NULL, SVCNAME); + + // if (NULL != hEventSource) + //{ + // StringCchPrintf(Buffer, 80, TEXT("%s failed with %d"), szFunction, GetLastError()); + + // lpszStrings[0] = SVCNAME; + // lpszStrings[1] = Buffer; + + // ReportEvent(hEventSource, // event log handle + // EVENTLOG_ERROR_TYPE, // event type + // 0, // event category + // SVC_ERROR, // event identifier + // NULL, // no security identifier + // 2, // size of lpszStrings array + // 0, // no binary data + // lpszStrings, // array of strings + // NULL); // no binary data + + // DeregisterEventSource(hEventSource); + //} +} diff --git a/zenserver/windows/service.h b/zenserver/windows/service.h new file mode 100644 index 000000000..7c9610983 --- /dev/null +++ b/zenserver/windows/service.h @@ -0,0 +1,20 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +class WindowsService +{ +public: + WindowsService(); + ~WindowsService(); + + virtual int Run() = 0; + + int ServiceMain(); + + static void Install(); + static void Delete(); + + int SvcMain(); + static void __stdcall SvcCtrlHandler(unsigned long); +}; diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index bb70846fa..7a6981fcd 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -14,6 +14,8 @@ target("zenserver") add_ldflags("/MANIFEST:EMBED") add_ldflags("/MANIFESTUAC:level='requireAdministrator'") add_ldflags("/LTCG") + else + del_files("windows/**") end add_options("vfs") diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 53dc41a24..cf24dc224 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -11,28 +11,35 @@ #include <zencore/timer.h> #include <zencore/windows.h> #include <zenhttp/httpserver.h> -#include <zenserverprocess.h> #include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include <zenutil/zenserverprocess.h> #include <fmt/format.h> -#include <mimalloc-new-delete.h> -#include <mimalloc.h> + +#if ZEN_USE_MIMALLOC +# include <mimalloc-new-delete.h> +# include <mimalloc.h> +#endif + #include <asio.hpp> #include <exception> #include <list> #include <lua.hpp> #include <optional> #include <regex> +#include <set> #include <unordered_map> ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable -#define DOCTEST_CONFIG_IMPLEMENT -#include <doctest/doctest.h> -#undef DOCTEST_CONFIG_IMPLEMENT +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT +#endif ////////////////////////////////////////////////////////////////////////// @@ -40,6 +47,10 @@ #include "config.h" #include "diag/logging.h" +#if ZEN_PLATFORM_WINDOWS +# include "windows/service.h" +#endif + ////////////////////////////////////////////////////////////////////////// // Sentry // @@ -82,11 +93,16 @@ #define ZEN_APP_NAME "Zen store" +namespace zen { + class ZenServer { + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + public: - void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid) + void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { + m_ServerEntry = ServerEntry; using namespace fmt::literals; ZEN_INFO(ZEN_APP_NAME " initializing"); @@ -94,23 +110,29 @@ public: if (ParentPid) { - m_Process.Initialize(ParentPid); + zen::ProcessHandle OwnerProcess; + OwnerProcess.Initialize(ParentPid); - if (!m_Process.IsValid()) + if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); + + // If the pid is not reachable should we just shut down immediately? the intended owner process + // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } + + m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = "zen_{}"_format(BasePort); - if (zen::NamedMutex::Exists(MutexName) || (m_ServerMutex.Create(MutexName) == false)) + if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str()); } @@ -151,11 +173,15 @@ public: m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.Enabled) + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { @@ -201,7 +227,11 @@ public: if (UpstreamCache->Initialize()) { - ZEN_INFO("upstream cache active"); + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); } else { @@ -267,7 +297,9 @@ public: void Run() { - if (m_Process.IsValid()) + Scrub(); + + if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } @@ -282,7 +314,7 @@ public: ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } - ZEN_INFO(ZEN_APP_NAME " now running"); + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if USE_SENTRY sentry_clear_modulecache(); @@ -293,7 +325,9 @@ public: __debugbreak(); } - m_Http->Run(m_TestMode); + const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; + + m_Http->Run(IsInteractiveMode); ZEN_INFO(ZEN_APP_NAME " exiting"); @@ -332,18 +366,51 @@ public: void CheckOwnerPid() { - if (m_Process.IsRunning()) + // Pick up any new "owner" processes + + std::set<uint32_t> AddedPids; + + for (auto& PidEntry : m_ServerEntry->SponsorPids) + { + if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed)) + { + if (PidEntry.compare_exchange_strong(ThisPid, 0)) + { + if (AddedPids.insert(ThisPid).second) + { + m_ProcessMonitor.AddPid(ThisPid); + + ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); + } + } + } + } + + if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { - ZEN_INFO(ZEN_APP_NAME " exiting since parent process id {} is gone", m_Process.Pid()); + ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } } + void Scrub() + { + ZEN_INFO("Storage validation STARTING"); + + ScrubContext Ctx; + m_CasStore->Scrub(Ctx); + m_CidStore->Scrub(Ctx); + m_ProjectStore->Scrub(Ctx); + m_StructuredCacheService->Scrub(Ctx); + + ZEN_INFO("Storage validation DONE"); + } + void Flush() { if (m_CasStore) @@ -366,16 +433,16 @@ private: std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; - zen::ProcessHandle m_Process; + zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; zen::Ref<zen::HttpServer> m_Http; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; - std::unique_ptr<ZenCacheStore> m_CacheStore; + std::unique_ptr<zen::ZenCacheStore> m_CacheStore; zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; - HttpTestService m_TestService; + zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr<zen::ProjectStore> m_ProjectStore; @@ -383,23 +450,39 @@ private: std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - HttpAdminService m_AdminService; - HttpHealthService m_HealthService; + zen::HttpAdminService m_AdminService; + zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; bool m_DebugOptionForcedCrash = false; }; -int -main(int argc, char* argv[]) +} // namespace zen + +class ZenWindowsService : public WindowsService { - mi_version(); +public: + ZenWindowsService(ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) + : m_GlobalOptions(GlobalOptions) + , m_ServiceConfig(ServiceConfig) + { + } - ZenServerOptions GlobalOptions; - ZenServiceConfig ServiceConfig; - ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); - InitializeLogging(GlobalOptions); + ZenWindowsService(const ZenWindowsService&) = delete; + ZenWindowsService& operator=(const ZenWindowsService&) = delete; + + virtual int Run() override; + +private: + ZenServerOptions& m_GlobalOptions; + ZenServiceConfig& m_ServiceConfig; +}; + +int +ZenWindowsService::Run() +{ + using namespace zen; #if USE_SENTRY // Initialize sentry.io client @@ -408,30 +491,47 @@ main(int argc, char* argv[]) sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); sentry_init(SentryOptions); - auto _ = zen::MakeGuard([&] { sentry_close(); }); + auto _ = zen::MakeGuard([] { sentry_close(); }); #endif - // Prototype config system, let's see how this pans out - - ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); - - ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + auto& GlobalOptions = m_GlobalOptions; + auto& ServiceConfig = m_ServiceConfig; try { + // Prototype config system, we'll see how this pans out + // + // TODO: we need to report any parse errors here + + ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); + + ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); - if (ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort)) + ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort); + + if (Entry) { // Instance already running for this port? Should double check pid ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid); + + if (GlobalOptions.OwnerPid) + { + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); + + std::exit(0); + } } - else + + Entry = ServerState.Register(GlobalOptions.BasePort); + + if (GlobalOptions.OwnerPid) { - ServerState.Register(GlobalOptions.BasePort); + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); } std::unique_ptr<std::thread> ShutdownThread; @@ -445,15 +545,17 @@ main(int argc, char* argv[]) Server.SetDataRoot(GlobalOptions.DataDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); - Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid); + Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); - ShutdownEvent->Wait(); - ZEN_INFO("shutdown signal received"); - Server.RequestExit(0); + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal received"); + Server.RequestExit(0); + } }}); // If we have a parent process, establish the mechanisms we need @@ -480,3 +582,37 @@ main(int argc, char* argv[]) return 0; } + +int +main(int argc, char* argv[]) +{ + using namespace zen; + +#if ZEN_USE_MIMALLOC + mi_version(); +#endif + + ZenServerOptions GlobalOptions; + ZenServiceConfig ServiceConfig; + ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); + InitializeLogging(GlobalOptions); + +#if ZEN_PLATFORM_WINDOWS + if (GlobalOptions.InstallService) + { + WindowsService::Install(); + + std::exit(0); + } + + if (GlobalOptions.UninstallService) + { + WindowsService::Delete(); + + std::exit(0); + } +#endif + + ZenWindowsService App(GlobalOptions, ServiceConfig); + return App.ServiceMain(); +} diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index aa9d538a5..db657d192 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -123,6 +123,7 @@ <ClInclude Include="upstream\upstreamcache.h" /> <ClInclude Include="upstream\zen.h" /> <ClInclude Include="vfs.h" /> + <ClInclude Include="windows\service.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="cache\structuredcache.cpp" /> @@ -142,6 +143,7 @@ <ClCompile Include="upstream\upstreamcache.cpp" /> <ClCompile Include="upstream\zen.cpp" /> <ClCompile Include="vfs.cpp" /> + <ClCompile Include="windows\service.cpp" /> <ClCompile Include="zenserver.cpp" /> </ItemGroup> <ItemGroup> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index a86a6d96d..250c55812 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -39,6 +39,7 @@ <Filter>upstream</Filter> </ClInclude> <ClInclude Include="testing\httptest.h" /> + <ClInclude Include="windows\service.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -73,6 +74,7 @@ <Filter>upstream</Filter> </ClCompile> <ClCompile Include="testing\httptest.cpp" /> + <ClCompile Include="windows\service.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="cache"> |