aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
committerMartin Ridgers <[email protected]>2021-09-21 11:06:13 +0200
commit68c951e0f440ffd483795dced737e88152c1a581 (patch)
tree5c0910ca2a85b45fb05dba3ce457b7d156213894 /zenserver
parentMerge main into linux-mac (diff)
parentTrigger storage scrubbing pass at startup (diff)
downloadzen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz
zen-68c951e0f440ffd483795dced737e88152c1a581.zip
Merged main into linux-mac
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/admin/admin.h4
-rw-r--r--zenserver/cache/structuredcache.cpp384
-rw-r--r--zenserver/cache/structuredcache.h11
-rw-r--r--zenserver/cache/structuredcachestore.cpp129
-rw-r--r--zenserver/cache/structuredcachestore.h43
-rw-r--r--zenserver/config.cpp56
-rw-r--r--zenserver/config.h24
-rw-r--r--zenserver/diag/diagsvcs.h38
-rw-r--r--zenserver/diag/logging.cpp37
-rw-r--r--zenserver/experimental/usnjournal.cpp10
-rw-r--r--zenserver/projectstore.cpp17
-rw-r--r--zenserver/projectstore.h11
-rw-r--r--zenserver/upstream/jupiter.cpp20
-rw-r--r--zenserver/upstream/jupiter.h1
-rw-r--r--zenserver/upstream/upstreamcache.cpp97
-rw-r--r--zenserver/upstream/upstreamcache.h9
-rw-r--r--zenserver/upstream/zen.cpp17
-rw-r--r--zenserver/upstream/zen.h1
-rw-r--r--zenserver/vfs.cpp3
-rw-r--r--zenserver/windows/service.cpp631
-rw-r--r--zenserver/windows/service.h20
-rw-r--r--zenserver/xmake.lua2
-rw-r--r--zenserver/zenserver.cpp220
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters2
25 files changed, 1455 insertions, 334 deletions
diff --git a/zenserver/admin/admin.h b/zenserver/admin/admin.h
index f90ad4537..3554b1005 100644
--- a/zenserver/admin/admin.h
+++ b/zenserver/admin/admin.h
@@ -4,6 +4,8 @@
#include <zenhttp/httpserver.h>
+namespace zen {
+
class HttpAdminService : public zen::HttpService
{
public:
@@ -16,3 +18,5 @@ public:
private:
};
+
+} // namespace zen
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index cf7deaa93..7f1fe7b44 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -9,6 +9,7 @@
#include <zencore/stream.h>
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
+#include <zenstore/CAS.h>
#include "structuredcache.h"
#include "structuredcachestore.h"
@@ -25,17 +26,130 @@
#include <queue>
#include <thread>
+#include <gsl/gsl-lite.hpp>
+
namespace zen {
using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
-HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore,
- zen::CasStore& InStore,
- zen::CidStore& InCidStore,
+namespace detail { namespace cacheopt {
+ constexpr std::string_view Local = "local"sv;
+ constexpr std::string_view Remote = "remote"sv;
+ constexpr std::string_view Data = "data"sv;
+ constexpr std::string_view Meta = "meta"sv;
+ constexpr std::string_view Value = "value"sv;
+ constexpr std::string_view Attachments = "attachments"sv;
+}} // namespace detail::cacheopt
+
+//////////////////////////////////////////////////////////////////////////
+
+enum class CachePolicy : uint8_t
+{
+ None = 0,
+ QueryLocal = 1 << 0,
+ QueryRemote = 1 << 1,
+ Query = QueryLocal | QueryRemote,
+ StoreLocal = 1 << 2,
+ StoreRemote = 1 << 3,
+ Store = StoreLocal | StoreRemote,
+ SkipMeta = 1 << 4,
+ SkipValue = 1 << 5,
+ SkipAttachments = 1 << 6,
+ SkipData = SkipMeta | SkipValue | SkipAttachments,
+ SkipLocalCopy = 1 << 7,
+ Local = QueryLocal | StoreLocal,
+ Remote = QueryRemote | StoreRemote,
+ Default = Query | Store,
+ Disable = None,
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
+
+CachePolicy
+ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
+{
+ CachePolicy QueryPolicy = CachePolicy::Query;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("query"sv);
+ if (!Opts.empty())
+ {
+ QueryPolicy = CachePolicy::None;
+ ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Local)
+ {
+ QueryPolicy |= CachePolicy::QueryLocal;
+ }
+ if (Opt == detail::cacheopt::Remote)
+ {
+ QueryPolicy |= CachePolicy::QueryRemote;
+ }
+ return true;
+ });
+ }
+ }
+
+ CachePolicy StorePolicy = CachePolicy::Store;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("store"sv);
+ if (!Opts.empty())
+ {
+ StorePolicy = CachePolicy::None;
+ ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Local)
+ {
+ StorePolicy |= CachePolicy::StoreLocal;
+ }
+ if (Opt == detail::cacheopt::Remote)
+ {
+ StorePolicy |= CachePolicy::StoreRemote;
+ }
+ return true;
+ });
+ }
+ }
+
+ CachePolicy SkipPolicy = CachePolicy::None;
+
+ {
+ std::string_view Opts = QueryParams.GetValue("skip"sv);
+ if (!Opts.empty())
+ {
+ ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) {
+ if (Opt == detail::cacheopt::Meta)
+ {
+ SkipPolicy |= CachePolicy::SkipMeta;
+ }
+ if (Opt == detail::cacheopt::Value)
+ {
+ SkipPolicy |= CachePolicy::SkipValue;
+ }
+ if (Opt == detail::cacheopt::Attachments)
+ {
+ SkipPolicy |= CachePolicy::SkipAttachments;
+ }
+ if (Opt == detail::cacheopt::Data)
+ {
+ SkipPolicy |= CachePolicy::SkipData;
+ }
+ return true;
+ });
+ }
+ }
+
+ return QueryPolicy | StorePolicy | SkipPolicy;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
+ CasStore& InStore,
+ CidStore& InCidStore,
std::unique_ptr<UpstreamCache> UpstreamCache)
-: m_Log(zen::logging::Get("cache"))
+: m_Log(logging::Get("cache"))
, m_CacheStore(InCacheStore)
, m_CasStore(InStore)
, m_CidStore(InCidStore)
@@ -59,8 +173,14 @@ HttpStructuredCacheService::Flush()
{
}
+void
+HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+}
+
void
-HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
+HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
CacheRef Ref;
@@ -75,28 +195,31 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request)
return HandleCacheBucketRequest(Request, Key);
}
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL
+ return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL
}
+ const auto QueryParams = Request.GetQueryParams();
+ CachePolicy Policy = ParseCachePolicy(QueryParams);
+
if (Ref.PayloadId == IoHash::Zero)
{
- return HandleCacheRecordRequest(Request, Ref);
+ return HandleCacheRecordRequest(Request, Ref, Policy);
}
else
{
- return HandleCachePayloadRequest(Request, Ref);
+ return HandleCachePayloadRequest(Request, Ref, Policy);
}
return;
}
void
-HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket)
+HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket)
{
ZEN_UNUSED(Request, Bucket);
switch (auto Verb = Request.RequestVerb())
{
- using enum zen::HttpVerb;
+ using enum HttpVerb;
case kHead:
case kGet:
@@ -110,22 +233,22 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req
if (m_CacheStore.DropBucket(Bucket))
{
- return Request.WriteResponse(zen::HttpResponseCode::OK);
+ return Request.WriteResponse(HttpResponseCode::OK);
}
else
{
- return Request.WriteResponse(zen::HttpResponseCode::NotFound);
+ return Request.WriteResponse(HttpResponseCode::NotFound);
}
break;
}
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
{
switch (auto Verb = Request.RequestVerb())
{
- using enum zen::HttpVerb;
+ using enum HttpVerb;
case kHead:
case kGet:
@@ -136,7 +259,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
bool InUpstreamCache = false;
- if (!Success && m_UpstreamCache)
+ const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+
+ if (QueryUpstream)
{
const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
: AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
@@ -153,14 +278,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
{
if (CacheRecordType == ZenContentType::kCbObject)
{
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All);
+ const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
if (ValidationResult == CbValidateError::None)
{
- zen::CbObjectView CacheRecord(UpstreamResult.Value.Data());
+ CbObjectView CacheRecord(UpstreamResult.Value.Data());
- zen::CbObjectWriter IndexData;
+ CbObjectWriter IndexData;
IndexData.BeginArray("references");
CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
IndexData.EndArray();
@@ -214,6 +338,18 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (FoundCount == AttachmentCount)
{
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+
+ if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
+ {
+ CbPackage PackageWithoutAttachments;
+ PackageWithoutAttachments.SetObject(CacheRecord);
+
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ PackageWithoutAttachments.Save(Writer);
+
+ Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ }
}
else
{
@@ -236,7 +372,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
{
ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
- return Request.WriteResponse(zen::HttpResponseCode::NotFound);
+ return Request.WriteResponse(HttpResponseCode::NotFound);
}
if (Verb == kHead)
@@ -248,41 +384,45 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
{
CbObjectView CacheRecord(Value.Value.Data());
- const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All);
+ const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
if (ValidationResult != CbValidateError::None)
{
ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
- return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
}
- uint32_t AttachmentCount = 0;
- uint32_t FoundCount = 0;
- uint64_t AttachmentBytes = 0ull;
+ const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
+ uint32_t AttachmentCount = 0;
+ uint32_t FoundCount = 0;
+ uint64_t AttachmentBytes = 0ull;
CbPackage Package;
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- AttachmentBytes += Chunk.Size();
- FoundCount++;
- }
- AttachmentCount++;
- });
-
- if (FoundCount != AttachmentCount)
+ if (!SkipAttachments)
{
- ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- FoundCount,
- AttachmentCount);
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ FoundCount++;
+ }
+ AttachmentCount++;
+ });
+
+ if (FoundCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ FoundCount,
+ AttachmentCount);
- return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
}
Package.SetObject(LoadCompactBinaryObject(Value.Value));
@@ -300,7 +440,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
}
else
{
@@ -310,41 +450,43 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
NiceBytes(Value.Value.Size()),
InUpstreamCache ? "UPSTREAM" : "LOCAL");
- return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
}
}
break;
case kPut:
{
- zen::IoBuffer Body = Request.ReadPayload();
+ IoBuffer Body = Request.ReadPayload();
if (!Body || Body.Size() == 0)
{
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
}
const HttpContentType ContentType = Request.RequestContentType();
+ const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
+
if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
{
// TODO: create a cache record and put value in CAS?
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
- if (m_UpstreamCache)
+ if (StoreUpstream)
{
auto Result = m_UpstreamCache->EnqueueUpstream(
{.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
}
- return Request.WriteResponse(zen::HttpResponseCode::Created);
+ return Request.WriteResponse(HttpResponseCode::Created);
}
else if (ContentType == HttpContentType::kCbObject)
{
// Validate payload before accessing it
- const zen::CbValidateError ValidationResult =
- zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All);
+ const CbValidateError ValidationResult =
+ ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All);
if (ValidationResult != CbValidateError::None)
{
@@ -360,7 +502,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
}
// Extract referenced payload hashes
- zen::CbObjectView Cbo(Body.Data());
+ CbObjectView Cbo(Body.Data());
std::vector<IoHash> References;
std::vector<IoHash> MissingRefs;
@@ -371,7 +513,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (!References.empty())
{
- zen::CbObjectWriter Idx;
+ CbObjectWriter Idx;
Idx.BeginArray("references");
for (const IoHash& Hash : References)
@@ -393,26 +535,23 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
Ref.BucketSegment,
Ref.HashKey,
- zen::NiceBytes(CacheValue.Value.Size()),
+ NiceBytes(CacheValue.Value.Size()),
MissingRefs.size(),
References.size());
- if (MissingRefs.empty())
+ if (MissingRefs.empty() && StoreUpstream)
{
- // Only enqueue valid cache records, i.e. all referenced payloads exists
- if (m_UpstreamCache)
- {
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
- }
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(References)});
- return Request.WriteResponse(zen::HttpResponseCode::Created);
+ return Request.WriteResponse(HttpResponseCode::Created);
}
else
{
// TODO: Binary attachments?
- zen::CbObjectWriter Response;
+ CbObjectWriter Response;
Response.BeginArray("needs");
for (const IoHash& MissingRef : MissingRefs)
{
@@ -422,7 +561,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Response.EndArray();
// Return Created | BadRequest?
- return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save());
+ return Request.WriteResponse(HttpResponseCode::Created, Response.Save());
}
}
else if (ContentType == HttpContentType::kCbPackage)
@@ -437,26 +576,22 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
CbObject CacheRecord = Package.GetObject();
- int32_t AttachmentCount = 0;
- int32_t NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
- bool AttachmentsOk = true;
-
+ struct AttachmentInsertResult
+ {
+ int32_t Count = 0;
+ int32_t NewCount = 0;
+ uint64_t Bytes = 0;
+ uint64_t NewBytes = 0;
+ bool Ok = false;
+ };
+
+ AttachmentInsertResult AttachmentResult{.Ok = true};
std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ std::vector<IoHash> PayloadIds;
- std::vector<IoHash> PayloadIds;
PayloadIds.reserve(Attachments.size());
- CacheRecord.IterateAttachments([this,
- &Ref,
- &Package,
- &AttachmentsOk,
- &AttachmentCount,
- &TotalAttachmentBytes,
- &TotalNewBytes,
- &NewAttachmentCount,
- &PayloadIds](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) {
if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
{
if (Attachment->IsCompressedBinary())
@@ -469,12 +604,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
if (InsertResult.New)
{
- TotalNewBytes += ChunkSize;
- ++NewAttachmentCount;
+ AttachmentResult.NewBytes += ChunkSize;
+ AttachmentResult.NewCount++;
}
- TotalAttachmentBytes += ChunkSize;
- AttachmentCount++;
+ AttachmentResult.Bytes += ChunkSize;
+ AttachmentResult.Count++;
}
else
{
@@ -482,7 +617,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Ref.BucketSegment,
Ref.HashKey,
AttachmentHash.AsHash());
- AttachmentsOk = false;
+ AttachmentResult.Ok = false;
}
}
else
@@ -491,23 +626,24 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
Ref.BucketSegment,
Ref.HashKey,
AttachmentHash.AsHash());
- AttachmentsOk = false;
+ AttachmentResult.Ok = false;
}
});
- if (!AttachmentsOk)
+ if (!AttachmentResult.Ok)
{
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
}
IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
- const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size();
+ const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size();
ZenCacheValue CacheValue{.Value = CacheRecordChunk};
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
- if (m_UpstreamCache)
+ if (StoreUpstream)
{
+ ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
.CacheKey = {Ref.BucketSegment, Ref.HashKey},
.PayloadIds = std::move(PayloadIds)});
@@ -516,17 +652,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
Ref.BucketSegment,
Ref.HashKey,
- zen::NiceBytes(TotalPackageBytes),
- NewAttachmentCount,
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- zen::NiceBytes(TotalAttachmentBytes));
+ NiceBytes(TotalPackageBytes),
+ AttachmentResult.NewCount,
+ AttachmentResult.Count,
+ NiceBytes(AttachmentResult.NewBytes),
+ NiceBytes(AttachmentResult.Bytes));
- return Request.WriteResponse(zen::HttpResponseCode::Created);
+ return Request.WriteResponse(HttpResponseCode::Created);
}
else
{
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest);
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
}
}
break;
@@ -540,7 +676,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref)
+HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
{
// Note: the URL references the uncompressed payload hash - so this maintains the mapping
// from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash
@@ -548,27 +684,29 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
// this is a PITA but a consequence of the fact that the client side code is not able to
// address data by compressed hash
+ ZEN_UNUSED(Policy);
+
switch (auto Verb = Request.RequestVerb())
{
- using enum zen::HttpVerb;
+ using enum HttpVerb;
case kHead:
case kGet:
{
- zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
- bool InUpstreamCache = false;
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ bool InUpstreamCache = false;
if (!Payload && m_UpstreamCache)
{
if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
UpstreamResult.Success)
{
- if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
- Payload = UpstreamResult.Value;
- zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload);
- zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
- InUpstreamCache = true;
+ Payload = UpstreamResult.Value;
+ IoHash ChunkHash = IoHash::HashBuffer(Payload);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
+ InUpstreamCache = true;
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
}
@@ -582,7 +720,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
if (!Payload)
{
ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
- return Request.WriteResponse(zen::HttpResponseCode::NotFound);
+ return Request.WriteResponse(HttpResponseCode::NotFound);
}
ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
@@ -598,29 +736,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
Request.SetSuppressResponseBody();
}
- return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Payload);
+ return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
}
break;
case kPut:
{
- if (zen::IoBuffer Body = Request.ReadPayload())
+ if (IoBuffer Body = Request.ReadPayload())
{
if (Body.Size() == 0)
{
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Empty payload not permitted");
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted");
}
- zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Body);
+ IoHash ChunkHash = IoHash::HashBuffer(Body);
- zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body));
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
if (!Compressed)
{
// All attachment payloads need to be in compressed buffer format
- return Request.WriteResponse(zen::HttpResponseCode::BadRequest,
+ return Request.WriteResponse(HttpResponseCode::BadRequest,
HttpContentType::kText,
"Attachments must be compressed");
}
@@ -632,7 +768,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
return Request.WriteResponse(HttpResponseCode::BadRequest);
}
- zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
@@ -646,11 +782,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
if (Result.New)
{
- return Request.WriteResponse(zen::HttpResponseCode::Created);
+ return Request.WriteResponse(HttpResponseCode::Created);
}
else
{
- return Request.WriteResponse(zen::HttpResponseCode::OK);
+ return Request.WriteResponse(HttpResponseCode::OK);
}
}
}
@@ -666,7 +802,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re
}
bool
-HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef)
+HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef)
{
std::string_view Key = Request.RelativeUri();
std::string_view::size_type BucketSplitOffset = Key.find_first_of('/');
@@ -702,14 +838,14 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach
PayloadSegment = Key.substr(PayloadSplitOffset + 1);
}
- if (HashSegment.size() != zen::IoHash::StringLength)
+ if (HashSegment.size() != IoHash::StringLength)
{
return false;
}
- if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength)
+ if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength)
{
- const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
+ const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash);
if (!IsOk)
{
@@ -718,10 +854,10 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach
}
else
{
- OutRef.PayloadId = zen::IoHash::Zero;
+ OutRef.PayloadId = IoHash::Zero;
}
- const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
+ const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash);
if (!IsOk)
{
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 8289fd700..bd163dd1d 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -10,13 +10,13 @@ namespace spdlog {
class logger;
}
-class ZenCacheStore;
-
namespace zen {
class CasStore;
class CidStore;
class UpstreamCache;
+class ZenCacheStore;
+enum class CachePolicy : uint8_t;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -60,6 +60,7 @@ public:
virtual void HandleRequest(zen::HttpServerRequest& Request) override;
void Flush();
+ void Scrub(ScrubContext& Ctx);
private:
struct CacheRef
@@ -70,13 +71,13 @@ private:
};
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
- void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
+ void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- ZenCacheStore& m_CacheStore;
+ zen::ZenCacheStore& m_CacheStore;
zen::CasStore& m_CasStore;
zen::CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 018955e65..502ca6605 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -24,15 +24,16 @@
#include <atlfile.h>
-using namespace zen;
-using namespace fmt::literals;
-
//////////////////////////////////////////////////////////////////////////
-ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
+namespace zen {
+
+using namespace fmt::literals;
+
+ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir}
{
ZEN_INFO("initializing structured cache at '{}'", RootDir);
- zen::CreateDirectories(RootDir);
+ CreateDirectories(RootDir);
}
ZenCacheStore::~ZenCacheStore()
@@ -40,7 +41,7 @@ ZenCacheStore::~ZenCacheStore()
}
bool
-ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue);
@@ -68,7 +69,7 @@ ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCac
}
void
-ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
// Store value and index
@@ -104,6 +105,12 @@ ZenCacheStore::Flush()
m_DiskLayer.Flush();
}
+void
+ZenCacheStore::Scrub(ScrubContext& Ctx)
+{
+ m_DiskLayer.Scrub(Ctx);
+ m_MemLayer.Scrub(Ctx);
+}
//////////////////////////////////////////////////////////////////////////
ZenCacheMemoryLayer::ZenCacheMemoryLayer()
@@ -115,7 +122,7 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer()
}
bool
-ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
CacheBucket* Bucket = nullptr;
@@ -139,7 +146,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey,
}
void
-ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
CacheBucket* Bucket = nullptr;
@@ -178,8 +185,14 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket)
return !!m_Buckets.erase(std::string(Bucket));
}
+void
+ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+}
+
bool
-ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
RwLock::SharedLockScope _(m_bucketLock);
@@ -196,7 +209,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue&
}
void
-ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
RwLock::ExclusiveLockScope _(m_bucketLock);
@@ -227,7 +240,7 @@ struct DiskLocation
struct DiskIndexEntry
{
- zen::IoHash Key;
+ IoHash Key;
DiskLocation Location;
};
@@ -243,8 +256,8 @@ struct ZenCacheDiskLayer::CacheBucket
void OpenOrCreate(std::filesystem::path BucketDir);
static bool Delete(std::filesystem::path BucketDir);
- bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
void Drop();
void Flush();
@@ -260,12 +273,12 @@ private:
BasicFile m_SobsFile;
TCasLogFile<DiskIndexEntry> m_SlogFile;
- void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey);
- void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value);
- RwLock m_IndexLock;
- tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index;
- uint64_t m_WriteCursor = 0;
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
};
ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas)
@@ -281,7 +294,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
{
if (std::filesystem::exists(BucketDir))
{
- zen::DeleteDirectories(BucketDir);
+ DeleteDirectories(BucketDir);
return true;
}
@@ -292,7 +305,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir)
void
ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
{
- zen::CreateDirectories(BucketDir);
+ CreateDirectories(BucketDir);
m_BucketDir = BucketDir;
@@ -357,7 +370,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
uint64_t MaxFileOffset = 0;
- if (zen::RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty())
+ if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty())
{
m_SlogFile.Replay([&](const DiskIndexEntry& Record) {
m_Index[Record.Key] = Record.Location;
@@ -372,25 +385,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir)
}
void
-ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey)
+ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey)
{
- char hex[sizeof(HashKey.Hash) * 2];
- ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex);
+ char HexString[sizeof(HashKey.Hash) * 2];
+ ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString);
Path.Append(m_BucketDir.c_str());
+ Path.Append(L"/blob/");
+ Path.AppendAsciiRange(HexString, HexString + 3);
+ Path.Append(L"/");
+ Path.AppendAsciiRange(HexString + 3, HexString + 5);
Path.Append(L"/");
- Path.AppendAsciiRange(hex, hex + sizeof(hex));
+ Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString));
}
bool
-ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue)
{
if (!m_Ok)
{
return false;
}
- zen::RwLock::SharedLockScope _(m_IndexLock);
+ RwLock::SharedLockScope _(m_IndexLock);
if (auto it = m_Index.find(HashKey); it != m_Index.end())
{
@@ -417,7 +434,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O
WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
- if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
+ if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str()))
{
OutValue.Value = Data;
OutValue.Value.SetContentType(ContentType);
@@ -431,7 +448,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O
}
void
-ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value)
{
if (!m_Ok)
{
@@ -453,12 +470,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheVa
EntryFlags |= DiskLocation::kStructured;
}
- zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags),
.Size = gsl::narrow<uint32_t>(Value.Value.Size())};
- m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16);
+ m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16);
if (auto it = m_Index.find(HashKey); it == m_Index.end())
{
@@ -483,7 +500,7 @@ ZenCacheDiskLayer::CacheBucket::Drop()
m_SobsFile.Close();
m_SlogFile.Close();
- zen::DeleteDirectories(m_BucketDir);
+ DeleteDirectories(m_BucketDir);
}
void
@@ -494,12 +511,22 @@ ZenCacheDiskLayer::CacheBucket::Flush()
}
void
-ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::Scrub(ScrubContext& Ctx)
{
- zen::WideStringBuilder<128> DataFilePath;
+ ZEN_UNUSED(Ctx);
+}
+
+void
+ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value)
+{
+ WideStringBuilder<128> DataFilePath;
BuildPath(DataFilePath, HashKey);
- // TODO: replace this with a more efficient implementation with proper atomic rename
+ // TODO: replace this process with a more efficient implementation with proper atomic rename
+ // and also avoid creating directories if we can
+
+ std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path();
+ CreateDirectories(ParentPath);
CAtlTemporaryFile DataFile;
@@ -507,21 +534,23 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const
if (FAILED(hRes))
{
- zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
+ ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir));
}
hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size()));
if (FAILED(hRes))
{
- zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
+ ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size())));
}
+ // Move file into place (note: not fully atomic!)
+
hRes = DataFile.Close(DataFilePath.c_str());
if (FAILED(hRes))
{
- zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath)));
+ ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath)));
}
// Update index
@@ -533,7 +562,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const
EntryFlags |= DiskLocation::kStructured;
}
- zen::RwLock::ExclusiveLockScope _(m_IndexLock);
+ RwLock::ExclusiveLockScope _(m_IndexLock);
DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0};
@@ -560,12 +589,12 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path&
ZenCacheDiskLayer::~ZenCacheDiskLayer() = default;
bool
-ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue)
+ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue)
{
CacheBucket* Bucket = nullptr;
{
- zen::RwLock::SharedLockScope _(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
auto it = m_Buckets.find(std::string(InBucket));
@@ -579,7 +608,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze
{
// Bucket needs to be opened/created
- zen::RwLock::ExclusiveLockScope _(m_Lock);
+ RwLock::ExclusiveLockScope _(m_Lock);
if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end())
{
@@ -603,12 +632,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze
}
void
-ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value)
+ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value)
{
CacheBucket* Bucket = nullptr;
{
- zen::RwLock::SharedLockScope _(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
auto it = m_Buckets.find(std::string(InBucket));
@@ -622,7 +651,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co
{
// New bucket needs to be created
- zen::RwLock::ExclusiveLockScope _(m_Lock);
+ RwLock::ExclusiveLockScope _(m_Lock);
if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end())
{
@@ -651,7 +680,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co
bool
ZenCacheDiskLayer::DropBucket(std::string_view InBucket)
{
- zen::RwLock::ExclusiveLockScope _(m_Lock);
+ RwLock::ExclusiveLockScope _(m_Lock);
auto it = m_Buckets.find(std::string(InBucket));
@@ -679,7 +708,7 @@ ZenCacheDiskLayer::Flush()
Buckets.reserve(m_Buckets.size());
{
- zen::RwLock::SharedLockScope _(m_Lock);
+ RwLock::SharedLockScope _(m_Lock);
for (auto& Kv : m_Buckets)
{
@@ -705,7 +734,7 @@ ZenCacheTracker::~ZenCacheTracker()
}
void
-ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey)
+ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey)
{
ZEN_UNUSED(Bucket);
ZEN_UNUSED(HashKey);
@@ -715,3 +744,5 @@ void
ZenCacheTracker::Flush()
{
}
+
+} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 48c3cfde9..fdf4a8cfe 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -23,8 +23,6 @@ namespace zen {
class WideStringBuilderBase;
class CasStore;
-} // namespace zen
-
/******************************************************************************
/$$$$$$$$ /$$$$$$ /$$
@@ -44,8 +42,8 @@ class CasStore;
struct ZenCacheValue
{
- zen::IoBuffer Value;
- zen::CbObject IndexData;
+ IoBuffer Value;
+ CbObject IndexData;
};
class ZenCacheMemoryLayer
@@ -54,34 +52,36 @@ public:
ZenCacheMemoryLayer();
~ZenCacheMemoryLayer();
- bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
+ void Scrub(ScrubContext& Ctx);
private:
struct CacheBucket
{
- zen::RwLock m_bucketLock;
- tsl::robin_map<zen::IoHash, zen::IoBuffer> m_cacheMap;
+ RwLock m_bucketLock;
+ tsl::robin_map<IoHash, IoBuffer> m_cacheMap;
- bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
};
- zen::RwLock m_Lock;
+ RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets;
};
class ZenCacheDiskLayer
{
public:
- ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir);
+ ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir);
~ZenCacheDiskLayer();
- bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Flush();
+ void Scrub(ScrubContext& Ctx);
private:
/** A cache bucket manages a single directory containing
@@ -89,22 +89,23 @@ private:
*/
struct CacheBucket;
- zen::CasStore& m_CasStore;
+ CasStore& m_CasStore;
std::filesystem::path m_RootDir;
- zen::RwLock m_Lock;
+ RwLock m_Lock;
std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive
};
class ZenCacheStore
{
public:
- ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir);
+ ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir);
~ZenCacheStore();
- bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue);
- void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value);
+ bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value);
bool DropBucket(std::string_view Bucket);
void Flush();
+ void Scrub(ScrubContext& Ctx);
private:
std::filesystem::path m_RootDir;
@@ -121,8 +122,10 @@ public:
ZenCacheTracker(ZenCacheStore& CacheStore);
~ZenCacheTracker();
- void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey);
+ void TrackAccess(std::string_view Bucket, const IoHash& HashKey);
void Flush();
private:
};
+
+} // namespace zen
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 578a3a202..164d2a792 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -55,6 +55,27 @@ PickDefaultStateDirectory()
#endif
+UpstreamCachePolicy
+ParseUpstreamCachePolicy(std::string_view Options)
+{
+ if (Options == "readonly")
+ {
+ return UpstreamCachePolicy::Read;
+ }
+ else if (Options == "writeonly")
+ {
+ return UpstreamCachePolicy::Write;
+ }
+ else if (Options == "disabled")
+ {
+ return UpstreamCachePolicy::Disabled;
+ }
+ else
+ {
+ return UpstreamCachePolicy::ReadWrite;
+ }
+}
+
void
ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig)
{
@@ -77,6 +98,21 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<std::string>(GlobalOptions.ChildId),
"<identifier>");
+#if ZEN_PLATFORM_WINDOWS
+ options.add_option("lifetime",
+ "",
+ "install",
+ "Install zenserver as a Windows service",
+ cxxopts::value<bool>(GlobalOptions.InstallService),
+ "");
+ options.add_option("lifetime",
+ "",
+ "uninstall",
+ "Uninstall zenserver as a Windows service",
+ cxxopts::value<bool>(GlobalOptions.UninstallService),
+ "");
+#endif
+
options.add_option("network",
"p",
"port",
@@ -98,6 +134,14 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"),
"");
+ std::string UpstreamCachePolicyOptions;
+ options.add_option("cache",
+ "",
+ "upstream-cache-policy",
+ "",
+ cxxopts::value<std::string>(UpstreamCachePolicyOptions)->default_value(""),
+ "Upstream cache policy (readwrite|readonly|writeonly|disabled)");
+
options.add_option("cache",
"",
"upstream-jupiter-url",
@@ -163,13 +207,6 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
options.add_option("cache",
"",
- "upstream-enabled",
- "Whether upstream caching is disabled",
- cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"),
- "");
-
- options.add_option("cache",
- "",
"upstream-thread-count",
"Number of threads used for upstream procsssing",
cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
@@ -185,6 +222,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
exit(0);
}
+
+ ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions);
}
catch (cxxopts::OptionParseException& e)
{
@@ -261,7 +300,8 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream"))
{
- ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled);
+ std::string Policy = UpstreamConfig->get_or("policy", std::string());
+ ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(Policy);
ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4);
if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter"))
diff --git a/zenserver/config.h b/zenserver/config.h
index 80ec86905..6ade1b401 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -9,12 +9,14 @@ struct ZenServerOptions
{
bool IsDebug = false;
bool IsTest = false;
- bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
- int BasePort = 1337; // Service listen port (used for both UDP and TCP)
- int OwnerPid = 0; // Parent process id (zero for standalone)
- std::string ChildId; // Id assigned by parent process (used for lifetime management)
- std::string LogId; // Id for tagging log output
- std::filesystem::path DataDir; // Root directory for state (used for testing)
+ bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
+ int BasePort = 1337; // Service listen port (used for both UDP and TCP)
+ int OwnerPid = 0; // Parent process id (zero for standalone)
+ std::string ChildId; // Id assigned by parent process (used for lifetime management)
+ bool InstallService = false; // Flag used to initiate service install (temporary)
+ bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
+ std::string LogId; // Id for tagging log output
+ std::filesystem::path DataDir; // Root directory for state (used for testing)
};
struct ZenUpstreamJupiterConfig
@@ -34,12 +36,20 @@ struct ZenUpstreamZenConfig
std::string Url;
};
+enum class UpstreamCachePolicy : uint8_t
+{
+ Disabled = 0,
+ Read = 1 << 0,
+ Write = 1 << 1,
+ ReadWrite = Read | Write
+};
+
struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
ZenUpstreamZenConfig ZenConfig;
int UpstreamThreadCount = 4;
- bool Enabled = false;
+ UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
};
struct ZenServiceConfig
diff --git a/zenserver/diag/diagsvcs.h b/zenserver/diag/diagsvcs.h
index 51ee98f67..61703e393 100644
--- a/zenserver/diag/diagsvcs.h
+++ b/zenserver/diag/diagsvcs.h
@@ -7,7 +7,9 @@
//////////////////////////////////////////////////////////////////////////
-class HttpTestService : public zen::HttpService
+namespace zen {
+
+class HttpTestService : public HttpService
{
uint32_t LogPoint = 0;
@@ -17,7 +19,7 @@ public:
virtual const char* BaseUri() const override { return "/test/"; }
- virtual void HandleRequest(zen::HttpServerRequest& Request) override
+ virtual void HandleRequest(HttpServerRequest& Request) override
{
using namespace std::literals;
@@ -25,21 +27,21 @@ public:
if (Uri == "hello"sv)
{
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"hello world!"sv);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"hello world!"sv);
// OutputLogMessageInternal(&LogPoint, 0, 0);
}
else if (Uri == "1K"sv)
{
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1k);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1k);
}
else if (Uri == "1M"sv)
{
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1m);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1m);
}
else if (Uri == "1M_1k"sv)
{
- std::vector<zen::IoBuffer> Buffers;
+ std::vector<IoBuffer> Buffers;
Buffers.reserve(1024);
for (int i = 0; i < 1024; ++i)
@@ -47,11 +49,11 @@ public:
Buffers.push_back(m_1k);
}
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers);
}
else if (Uri == "1G"sv)
{
- std::vector<zen::IoBuffer> Buffers;
+ std::vector<IoBuffer> Buffers;
Buffers.reserve(1024);
for (int i = 0; i < 1024; ++i)
@@ -59,11 +61,11 @@ public:
Buffers.push_back(m_1m);
}
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers);
}
else if (Uri == "1G_1k"sv)
{
- std::vector<zen::IoBuffer> Buffers;
+ std::vector<IoBuffer> Buffers;
Buffers.reserve(1024 * 1024);
for (int i = 0; i < 1024 * 1024; ++i)
@@ -71,16 +73,16 @@ public:
Buffers.push_back(m_1k);
}
- Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers);
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers);
}
}
private:
- zen::IoBuffer m_1m{1024 * 1024};
- zen::IoBuffer m_1k{m_1m, 0u, 1024};
+ IoBuffer m_1m{1024 * 1024};
+ IoBuffer m_1k{m_1m, 0u, 1024};
};
-class HttpHealthService : public zen::HttpService
+class HttpHealthService : public HttpService
{
public:
HttpHealthService() = default;
@@ -88,16 +90,18 @@ public:
virtual const char* BaseUri() const override { return "/health/"; }
- virtual void HandleRequest(zen::HttpServerRequest& Request) override
+ virtual void HandleRequest(HttpServerRequest& Request) override
{
using namespace std::literals;
switch (Request.RequestVerb())
{
- case zen::HttpVerb::kGet:
- return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"OK!"sv);
+ case HttpVerb::kGet:
+ return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv);
}
}
private:
};
+
+} // namespace zen
diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp
index 48eda7512..41b140f90 100644
--- a/zenserver/diag/logging.cpp
+++ b/zenserver/diag/logging.cpp
@@ -9,6 +9,9 @@
#include <spdlog/pattern_formatter.h>
#include <spdlog/sinks/ansicolor_sink.h>
#include <spdlog/sinks/basic_file_sink.h>
+#include <spdlog/sinks/daily_file_sink.h>
+#include <spdlog/sinks/msvc_sink.h>
+#include <spdlog/sinks/rotating_file_sink.h>
#include <spdlog/sinks/stdout_color_sinks.h>
#include <spdlog/spdlog.h>
#include <zencore/string.h>
@@ -204,6 +207,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
IsAsync = false;
}
+ if (GlobalOptions.IsTest)
+ {
+ LogLevel = spdlog::level::trace;
+ IsAsync = false;
+ }
+
if (IsAsync)
{
const int QueueSize = 8192;
@@ -217,7 +226,19 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
// Sinks
auto ConsoleSink = std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>();
- auto FileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true);
+
+#if 0
+ auto FileSink = std::make_shared<spdlog::sinks::daily_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()),
+ 0,
+ 0,
+ /* truncate */ false,
+ uint16_t(/* max files */ 14));
+#else
+ auto FileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()),
+ /* max size */ 128 * 1024 * 1024,
+ /* max files */ 16,
+ /* rotate on open */ true);
+#endif
// Default
@@ -228,20 +249,30 @@ InitializeLogging(const ZenServerOptions& GlobalOptions)
Sinks.push_back(ConsoleSink);
Sinks.push_back(FileSink);
+#if ZEN_PLATFORM_WINDOWS
+ if (zen::IsDebuggerPresent())
+ {
+ auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>();
+ DebugSink->set_level(spdlog::level::debug);
+ Sinks.push_back(DebugSink);
+ }
+#endif
+
// Jupiter - only log HTTP traffic to file
auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink);
spdlog::register_logger(JupiterLogger);
- JupiterLogger->set_level(LogLevel);
+
+ // Zen - only log HTTP traffic to file
auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink);
spdlog::register_logger(ZenClientLogger);
- ZenClientLogger->set_level(LogLevel);
// Configure all registered loggers according to settings
spdlog::set_level(LogLevel);
spdlog::flush_on(spdlog::level::err);
+ spdlog::flush_every(std::chrono::seconds{2});
spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now()));
}
diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp
index ab83b8a1c..1e765fbe5 100644
--- a/zenserver/experimental/usnjournal.cpp
+++ b/zenserver/experimental/usnjournal.cpp
@@ -34,14 +34,14 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
if (!Success)
{
- zen::ThrowSystemException("GetVolumePathName failed");
+ zen::ThrowLastError("GetVolumePathName failed");
}
Success = GetVolumeNameForVolumeMountPoint(VolumePathName, VolumeName, ZEN_ARRAY_COUNT(VolumeName));
if (!Success)
{
- zen::ThrowSystemException("GetVolumeNameForVolumeMountPoint failed");
+ zen::ThrowLastError("GetVolumeNameForVolumeMountPoint failed");
}
// Chop off trailing slash since we want to open a volume handle, not a handle to the volume root directory
@@ -64,7 +64,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
if (m_VolumeHandle == INVALID_HANDLE_VALUE)
{
- ThrowSystemException("Volume handle open failed");
+ ThrowLastError("Volume handle open failed");
}
// Figure out which file system is in use for volume
@@ -86,7 +86,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
if (!Success)
{
- ThrowSystemException("Failed to get volume information");
+ ThrowLastError("Failed to get volume information");
}
ZEN_DEBUG("File system type is {}", WideToUtf8(FileSystemName));
@@ -173,7 +173,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath)
if (!Success)
{
- ThrowSystemException("GetFileInformationByHandleEx failed");
+ ThrowLastError("GetFileInformationByHandleEx failed");
}
const Frn VolumeRootFrn = FileInformation.FileId;
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 404484edf..1f4239b23 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -780,6 +780,12 @@ ProjectStore::Project::Flush()
// TODO
}
+void
+ProjectStore::Project::Scrub(ScrubContext& Ctx)
+{
+ ZEN_UNUSED(Ctx);
+}
+
//////////////////////////////////////////////////////////////////////////
ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath)
@@ -815,6 +821,17 @@ ProjectStore::Flush()
}
}
+void
+ProjectStore::Scrub(ScrubContext& Ctx)
+{
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ for (auto& Kv : m_Projects)
+ {
+ Kv.second.Scrub(Ctx);
+ }
+}
+
ProjectStore::Project*
ProjectStore::OpenProject(std::string_view ProjectId)
{
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index 3d2247305..e545d78b9 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -101,6 +101,7 @@ public:
spdlog::logger& Log() { return m_OuterProject->Log(); }
void Flush();
+ void Scrub(ScrubContext& Ctx);
std::size_t OplogCount() const { return m_LatestOpMap.size(); }
@@ -154,6 +155,7 @@ public:
void Write();
[[nodiscard]] static bool Exists(std::filesystem::path BasePath);
void Flush();
+ void Scrub(ScrubContext& Ctx);
spdlog::logger& Log();
private:
@@ -177,6 +179,7 @@ public:
void DeleteProject(std::string_view ProjectId);
bool Exists(std::string_view ProjectId);
void Flush();
+ void Scrub(ScrubContext& Ctx);
spdlog::logger& Log() { return m_Log; }
const std::filesystem::path& BasePath() const { return m_ProjectBasePath; }
@@ -193,13 +196,13 @@ private:
//////////////////////////////////////////////////////////////////////////
//
-// {ns} a root namespace, should be associated with the project which owns it
+// {project} a project identifier
// {target} a variation of the project, typically a build target
// {lsn} oplog entry sequence number
//
-// /prj/{ns}
-// /prj/{ns}/oplog/{target}
-// /prj/{ns}/oplog/{target}/{lsn}
+// /prj/{project}
+// /prj/{project}/oplog/{target}
+// /prj/{project}/oplog/{target}/{lsn}
//
// oplog entry
//
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 4a5467648..2e74602db 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -66,6 +66,14 @@ CloudCacheSession::~CloudCacheSession()
}
CloudCacheResult
+CloudCacheSession::Authenticate()
+{
+ std::string Auth;
+ const bool Success = m_CacheClient->AcquireAccessToken(Auth);
+ return {.Success = Success};
+}
+
+CloudCacheResult
CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key)
{
std::string Auth;
@@ -163,7 +171,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -194,7 +204,9 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
CloudCacheResult
@@ -215,7 +227,9 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
- return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)};
+ return {.Bytes = Response.uploaded_bytes,
+ .ElapsedSeconds = Response.elapsed,
+ .Success = (Response.status_code == 200 || Response.status_code == 201)};
}
std::vector<IoHash>
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 5535ba000..21217387c 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -61,6 +61,7 @@ public:
CloudCacheSession(CloudCacheClient* OuterClient);
~CloudCacheSession();
+ CloudCacheResult Authenticate();
CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key);
CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key);
CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType);
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 38d30a795..d6b6d44be 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -94,7 +94,7 @@ namespace detail {
std::atomic_bool m_CompleteAdding{false};
};
- class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint
+ class JupiterUpstreamEndpoint final : public UpstreamEndpoint
{
public:
JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc)
@@ -108,8 +108,9 @@ namespace detail {
virtual bool Initialize() override
{
- // TODO: Test and authenticate Jupiter client connection
- return !m_Client->ServiceUrl().empty();
+ CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.Authenticate();
+ return Result.Success;
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -118,8 +119,8 @@ namespace detail {
{
try
{
- zen::CloudCacheSession Session(m_Client);
- CloudCacheResult Result;
+ CloudCacheSession Session(m_Client);
+ CloudCacheResult Result;
if (m_UseLegacyDdc && Type == ZenContentType::kBinary)
{
@@ -134,7 +135,7 @@ namespace detail {
{
CbPackage Package;
- const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All);
+ const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All);
if (Result.Success = ValidationResult == CbValidateError::None; Result.Success)
{
CbObject CacheRecord = LoadCompactBinaryObject(Result.Response);
@@ -183,7 +184,7 @@ namespace detail {
{
try
{
- zen::CloudCacheSession Session(m_Client);
+ CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
return {.Value = Result.Response,
@@ -278,7 +279,7 @@ namespace detail {
RefPtr<CloudCacheClient> m_Client;
};
- class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint
+ class ZenUpstreamEndpoint final : public UpstreamEndpoint
{
public:
ZenUpstreamEndpoint(std::string_view ServiceUrl)
@@ -292,8 +293,20 @@ namespace detail {
virtual bool Initialize() override
{
- // TODO: Test and authenticate Zen client connection
- return !m_Client->ServiceUrl().empty();
+ try
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+ for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt)
+ {
+ Result = Session.SayHello();
+ }
+ return Result.Success;
+ }
+ catch (std::exception&)
+ {
+ return false;
+ }
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -344,14 +357,14 @@ namespace detail {
try
{
- zen::ZenStructuredCacheSession Session(*m_Client);
- ZenCacheResult Result;
- int64_t TotalBytes = 0ull;
- double TotalElapsedSeconds = 0.0;
+ ZenStructuredCacheSession Session(*m_Client);
+ ZenCacheResult Result;
+ int64_t TotalBytes = 0ull;
+ double TotalElapsedSeconds = 0.0;
if (CacheRecord.Type == ZenContentType::kCbPackage)
{
- zen::CbPackage Package;
+ CbPackage Package;
Package.SetObject(CbObject(SharedBuffer(RecordValue)));
for (const IoBuffer& Payload : Payloads)
@@ -427,8 +440,8 @@ namespace detail {
}
private:
- std::string m_DisplayName;
- RefPtr<zen::ZenStructuredCacheClient> m_Client;
+ std::string m_DisplayName;
+ RefPtr<ZenStructuredCacheClient> m_Client;
};
} // namespace detail
@@ -455,7 +468,7 @@ class UpstreamStats final
};
public:
- UpstreamStats() : m_Log(zen::logging::Get("upstream")) {}
+ UpstreamStats() : m_Log(logging::Get("upstream")) {}
void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result)
{
@@ -523,8 +536,8 @@ private:
class DefaultUpstreamCache final : public UpstreamCache
{
public:
- DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
- : m_Log(zen::logging::Get("upstream"))
+ DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream"))
, m_Options(Options)
, m_CacheStore(CacheStore)
, m_CidStore(CidStore)
@@ -559,12 +572,15 @@ public:
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -573,12 +589,15 @@ public:
virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
{
- for (auto& Endpoint : m_Endpoints)
+ if (m_Options.ReadUpstream)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ for (auto& Endpoint : m_Endpoints)
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
@@ -587,7 +606,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
- if (m_IsRunning.load())
+ if (m_IsRunning.load() && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
@@ -697,21 +716,21 @@ private:
spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- UpstreamCacheOptions m_Options;
- ::ZenCacheStore& m_CacheStore;
- CidStore& m_CidStore;
- UpstreamQueue m_UpstreamQueue;
- UpstreamStats m_Stats;
- std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints;
- std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ spdlog::logger& m_Log;
+ UpstreamCacheOptions m_Options;
+ ZenCacheStore& m_CacheStore;
+ CidStore& m_CidStore;
+ UpstreamQueue m_UpstreamQueue;
+ UpstreamStats m_Stats;
+ std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::atomic_bool m_IsRunning{false};
};
//////////////////////////////////////////////////////////////////////////
std::unique_ptr<UpstreamCache>
-MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore)
+MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore)
{
return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore);
}
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 327778452..142fe260f 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -8,11 +8,10 @@
#include <memory>
-class ZenCacheStore;
-
namespace zen {
class CidStore;
+class ZenCacheStore;
struct CloudCacheClientOptions;
struct UpstreamCacheKey
@@ -36,7 +35,9 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
};
struct GetUpstreamCacheResult
@@ -101,7 +102,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0;
};
-std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore);
+std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore);
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 55ddd310f..7f689d7f3 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -73,7 +73,7 @@ namespace detail {
// Note that currently this just implements an UDP echo service for testing purposes
-Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId())
+Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId())
{
}
@@ -370,7 +370,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
using namespace std::literals;
ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient)
-: m_Log(zen::logging::Get("zenclient"sv))
+: m_Log(logging::Get("zenclient"sv))
, m_Client(OuterClient)
{
m_SessionState = m_Client.AllocSessionState();
@@ -382,6 +382,19 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession()
}
ZenCacheResult
+ZenStructuredCacheSession::SayHello()
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/test/hello";
+
+ cpr::Session& Session = m_SessionState->Session;
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ cpr::Response Response = Session.Get();
+
+ return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+ZenCacheResult
ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
ExtendableStringBuilder<256> Uri;
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index ff4a551bf..36cfd1217 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -109,6 +109,7 @@ public:
ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient);
~ZenStructuredCacheSession();
+ ZenCacheResult SayHello();
ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type);
ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
diff --git a/zenserver/vfs.cpp b/zenserver/vfs.cpp
index 86e265b20..fcc9a71f8 100644
--- a/zenserver/vfs.cpp
+++ b/zenserver/vfs.cpp
@@ -5,7 +5,6 @@
#if ZEN_WITH_VFS
# include <zencore/except.h>
# include <zencore/filesystem.h>
-# include <zencore/snapshot_manifest.h>
# include <zencore/stream.h>
# include <zencore/windows.h>
# include <zencore/logging.h>
@@ -532,7 +531,7 @@ retry:
}
else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND))
{
- throw zen::WindowsException(hRes, "Failed to initialize root placeholder");
+ ThrowSystemException(hRes, "Failed to initialize root placeholder");
}
// Ignore error, problems will be reported below anyway
diff --git a/zenserver/windows/service.cpp b/zenserver/windows/service.cpp
new file mode 100644
index 000000000..017b5f9a7
--- /dev/null
+++ b/zenserver/windows/service.cpp
@@ -0,0 +1,631 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "service.h"
+
+#include <zencore/zencore.h>
+
+#include <stdio.h>
+#include <tchar.h>
+#include <zencore/windows.h>
+
+#define SVCNAME L"Zen Store"
+
+SERVICE_STATUS gSvcStatus;
+SERVICE_STATUS_HANDLE gSvcStatusHandle;
+HANDLE ghSvcStopEvent = NULL;
+
+void SvcInstall(void);
+
+void ReportSvcStatus(DWORD, DWORD, DWORD);
+void SvcReportEvent(LPTSTR);
+
+WindowsService::WindowsService()
+{
+}
+
+WindowsService::~WindowsService()
+{
+}
+
+//
+// Purpose:
+// Installs a service in the SCM database
+//
+// Parameters:
+// None
+//
+// Return value:
+// None
+//
+VOID
+WindowsService::Install()
+{
+ SC_HANDLE schSCManager;
+ SC_HANDLE schService;
+ TCHAR szPath[MAX_PATH];
+
+ if (!GetModuleFileName(NULL, szPath, MAX_PATH))
+ {
+ printf("Cannot install service (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Create the service
+
+ schService = CreateService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ SVCNAME, // service name to display
+ SERVICE_ALL_ACCESS, // desired access
+ SERVICE_WIN32_OWN_PROCESS, // service type
+ SERVICE_DEMAND_START, // start type
+ SERVICE_ERROR_NORMAL, // error control type
+ szPath, // path to service's binary
+ NULL, // no load ordering group
+ NULL, // no tag identifier
+ NULL, // no dependencies
+ NULL, // LocalSystem account
+ NULL); // no password
+
+ if (schService == NULL)
+ {
+ printf("CreateService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+ else
+ printf("Service installed successfully\n");
+
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+
+void
+WindowsService::Delete()
+{
+ SC_HANDLE schSCManager;
+ SC_HANDLE schService;
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the service.
+
+ schService = OpenService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ DELETE); // need delete access
+
+ if (schService == NULL)
+ {
+ printf("OpenService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+
+ // Delete the service.
+
+ if (!DeleteService(schService))
+ {
+ printf("DeleteService failed (%d)\n", GetLastError());
+ }
+ else
+ printf("Service deleted successfully\n");
+
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+
+WindowsService* gSvc;
+
+void WINAPI
+CallMain(DWORD, LPSTR*)
+{
+ gSvc->SvcMain();
+}
+
+int
+WindowsService::ServiceMain()
+{
+ if (zen::IsInteractiveSession())
+ {
+ // Not actually running as a service
+ return Run();
+ }
+ else
+ {
+ gSvc = this;
+
+ SERVICE_TABLE_ENTRY DispatchTable[] = {{(LPWSTR)SVCNAME, (LPSERVICE_MAIN_FUNCTION)&CallMain}, {NULL, NULL}};
+
+ // This call returns when the service has stopped.
+ // The process should simply terminate when the call returns.
+
+ if (!StartServiceCtrlDispatcher(DispatchTable))
+ {
+ SvcReportEvent((LPTSTR)L"StartServiceCtrlDispatcher");
+ }
+ }
+
+ return 0;
+}
+
+int
+WindowsService::SvcMain()
+{
+ // Register the handler function for the service
+
+ gSvcStatusHandle = RegisterServiceCtrlHandler(SVCNAME, SvcCtrlHandler);
+
+ if (!gSvcStatusHandle)
+ {
+ SvcReportEvent((LPTSTR)TEXT("RegisterServiceCtrlHandler"));
+
+ return 1;
+ }
+
+ // These SERVICE_STATUS members remain as set here
+
+ gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS;
+ gSvcStatus.dwServiceSpecificExitCode = 0;
+
+ // Report initial status to the SCM
+
+ ReportSvcStatus(SERVICE_START_PENDING, NO_ERROR, 3000);
+
+ // Create an event. The control handler function, SvcCtrlHandler,
+ // signals this event when it receives the stop control code.
+
+ ghSvcStopEvent = CreateEvent(NULL, // default security attributes
+ TRUE, // manual reset event
+ FALSE, // not signaled
+ NULL); // no name
+
+ if (ghSvcStopEvent == NULL)
+ {
+ ReportSvcStatus(SERVICE_STOPPED, GetLastError(), 0);
+
+ return 1;
+ }
+
+ // Report running status when initialization is complete.
+
+ ReportSvcStatus(SERVICE_RUNNING, NO_ERROR, 0);
+
+ int ReturnCode = Run();
+
+ ReportSvcStatus(SERVICE_STOPPED, NO_ERROR, 0);
+
+ return ReturnCode;
+}
+
+//
+// Purpose:
+// Retrieves and displays the current service configuration.
+//
+// Parameters:
+// None
+//
+// Return value:
+// None
+//
+void
+DoQuerySvc()
+{
+ SC_HANDLE schSCManager{};
+ SC_HANDLE schService{};
+ LPQUERY_SERVICE_CONFIG lpsc{};
+ LPSERVICE_DESCRIPTION lpsd{};
+ DWORD dwBytesNeeded{}, cbBufSize{}, dwError{};
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the service.
+
+ schService = OpenService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ SERVICE_QUERY_CONFIG); // need query config access
+
+ if (schService == NULL)
+ {
+ printf("OpenService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+
+ // Get the configuration information.
+
+ if (!QueryServiceConfig(schService, NULL, 0, &dwBytesNeeded))
+ {
+ dwError = GetLastError();
+ if (ERROR_INSUFFICIENT_BUFFER == dwError)
+ {
+ cbBufSize = dwBytesNeeded;
+ lpsc = (LPQUERY_SERVICE_CONFIG)LocalAlloc(LMEM_FIXED, cbBufSize);
+ }
+ else
+ {
+ printf("QueryServiceConfig failed (%d)", dwError);
+ goto cleanup;
+ }
+ }
+
+ if (!QueryServiceConfig(schService, lpsc, cbBufSize, &dwBytesNeeded))
+ {
+ printf("QueryServiceConfig failed (%d)", GetLastError());
+ goto cleanup;
+ }
+
+ if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, NULL, 0, &dwBytesNeeded))
+ {
+ dwError = GetLastError();
+ if (ERROR_INSUFFICIENT_BUFFER == dwError)
+ {
+ cbBufSize = dwBytesNeeded;
+ lpsd = (LPSERVICE_DESCRIPTION)LocalAlloc(LMEM_FIXED, cbBufSize);
+ }
+ else
+ {
+ printf("QueryServiceConfig2 failed (%d)", dwError);
+ goto cleanup;
+ }
+ }
+
+ if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, (LPBYTE)lpsd, cbBufSize, &dwBytesNeeded))
+ {
+ printf("QueryServiceConfig2 failed (%d)", GetLastError());
+ goto cleanup;
+ }
+
+ // Print the configuration information.
+
+ _tprintf(TEXT("%s configuration: \n"), SVCNAME);
+ _tprintf(TEXT(" Type: 0x%x\n"), lpsc->dwServiceType);
+ _tprintf(TEXT(" Start Type: 0x%x\n"), lpsc->dwStartType);
+ _tprintf(TEXT(" Error Control: 0x%x\n"), lpsc->dwErrorControl);
+ _tprintf(TEXT(" Binary path: %s\n"), lpsc->lpBinaryPathName);
+ _tprintf(TEXT(" Account: %s\n"), lpsc->lpServiceStartName);
+
+ if (lpsd->lpDescription != NULL && lstrcmp(lpsd->lpDescription, TEXT("")) != 0)
+ _tprintf(TEXT(" Description: %s\n"), lpsd->lpDescription);
+ if (lpsc->lpLoadOrderGroup != NULL && lstrcmp(lpsc->lpLoadOrderGroup, TEXT("")) != 0)
+ _tprintf(TEXT(" Load order group: %s\n"), lpsc->lpLoadOrderGroup);
+ if (lpsc->dwTagId != 0)
+ _tprintf(TEXT(" Tag ID: %d\n"), lpsc->dwTagId);
+ if (lpsc->lpDependencies != NULL && lstrcmp(lpsc->lpDependencies, TEXT("")) != 0)
+ _tprintf(TEXT(" Dependencies: %s\n"), lpsc->lpDependencies);
+
+ LocalFree(lpsc);
+ LocalFree(lpsd);
+
+cleanup:
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+
+//
+// Purpose:
+// Disables the service.
+//
+// Parameters:
+// None
+//
+// Return value:
+// None
+//
+void
+DoDisableSvc()
+{
+ SC_HANDLE schSCManager;
+ SC_HANDLE schService;
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the service.
+
+ schService = OpenService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ SERVICE_CHANGE_CONFIG); // need change config access
+
+ if (schService == NULL)
+ {
+ printf("OpenService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+
+ // Change the service start type.
+
+ if (!ChangeServiceConfig(schService, // handle of service
+ SERVICE_NO_CHANGE, // service type: no change
+ SERVICE_DISABLED, // service start type
+ SERVICE_NO_CHANGE, // error control: no change
+ NULL, // binary path: no change
+ NULL, // load order group: no change
+ NULL, // tag ID: no change
+ NULL, // dependencies: no change
+ NULL, // account name: no change
+ NULL, // password: no change
+ NULL)) // display name: no change
+ {
+ printf("ChangeServiceConfig failed (%d)\n", GetLastError());
+ }
+ else
+ printf("Service disabled successfully.\n");
+
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+
+//
+// Purpose:
+// Enables the service.
+//
+// Parameters:
+// None
+//
+// Return value:
+// None
+//
+VOID __stdcall DoEnableSvc()
+{
+ SC_HANDLE schSCManager;
+ SC_HANDLE schService;
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the service.
+
+ schService = OpenService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ SERVICE_CHANGE_CONFIG); // need change config access
+
+ if (schService == NULL)
+ {
+ printf("OpenService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+
+ // Change the service start type.
+
+ if (!ChangeServiceConfig(schService, // handle of service
+ SERVICE_NO_CHANGE, // service type: no change
+ SERVICE_DEMAND_START, // service start type
+ SERVICE_NO_CHANGE, // error control: no change
+ NULL, // binary path: no change
+ NULL, // load order group: no change
+ NULL, // tag ID: no change
+ NULL, // dependencies: no change
+ NULL, // account name: no change
+ NULL, // password: no change
+ NULL)) // display name: no change
+ {
+ printf("ChangeServiceConfig failed (%d)\n", GetLastError());
+ }
+ else
+ printf("Service enabled successfully.\n");
+
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+//
+// Purpose:
+// Updates the service description to "This is a test description".
+//
+// Parameters:
+// None
+//
+// Return value:
+// None
+//
+void
+DoUpdateSvcDesc()
+{
+ SC_HANDLE schSCManager;
+ SC_HANDLE schService;
+ SERVICE_DESCRIPTION sd;
+ TCHAR szDesc[] = TEXT("This is a test description");
+
+ // Get a handle to the SCM database.
+
+ schSCManager = OpenSCManager(NULL, // local computer
+ NULL, // ServicesActive database
+ SC_MANAGER_ALL_ACCESS); // full access rights
+
+ if (NULL == schSCManager)
+ {
+ printf("OpenSCManager failed (%d)\n", GetLastError());
+ return;
+ }
+
+ // Get a handle to the service.
+
+ schService = OpenService(schSCManager, // SCM database
+ SVCNAME, // name of service
+ SERVICE_CHANGE_CONFIG); // need change config access
+
+ if (schService == NULL)
+ {
+ printf("OpenService failed (%d)\n", GetLastError());
+ CloseServiceHandle(schSCManager);
+ return;
+ }
+
+ // Change the service description.
+
+ sd.lpDescription = szDesc;
+
+ if (!ChangeServiceConfig2(schService, // handle to service
+ SERVICE_CONFIG_DESCRIPTION, // change: description
+ &sd)) // new description
+ {
+ printf("ChangeServiceConfig2 failed\n");
+ }
+ else
+ printf("Service description updated successfully.\n");
+
+ CloseServiceHandle(schService);
+ CloseServiceHandle(schSCManager);
+}
+
+//
+// Purpose:
+// Sets the current service status and reports it to the SCM.
+//
+// Parameters:
+// dwCurrentState - The current state (see SERVICE_STATUS)
+// dwWin32ExitCode - The system error code
+// dwWaitHint - Estimated time for pending operation,
+// in milliseconds
+//
+// Return value:
+// None
+//
+VOID
+ReportSvcStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint)
+{
+ static DWORD dwCheckPoint = 1;
+
+ // Fill in the SERVICE_STATUS structure.
+
+ gSvcStatus.dwCurrentState = dwCurrentState;
+ gSvcStatus.dwWin32ExitCode = dwWin32ExitCode;
+ gSvcStatus.dwWaitHint = dwWaitHint;
+
+ if (dwCurrentState == SERVICE_START_PENDING)
+ gSvcStatus.dwControlsAccepted = 0;
+ else
+ gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP;
+
+ if ((dwCurrentState == SERVICE_RUNNING) || (dwCurrentState == SERVICE_STOPPED))
+ gSvcStatus.dwCheckPoint = 0;
+ else
+ gSvcStatus.dwCheckPoint = dwCheckPoint++;
+
+ // Report the status of the service to the SCM.
+ SetServiceStatus(gSvcStatusHandle, &gSvcStatus);
+}
+
+void
+WindowsService::SvcCtrlHandler(DWORD dwCtrl)
+{
+ // Handle the requested control code.
+ //
+ // Called by SCM whenever a control code is sent to the service
+ // using the ControlService function.
+
+ switch (dwCtrl)
+ {
+ case SERVICE_CONTROL_STOP:
+ ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0);
+
+ // Signal the service to stop.
+
+ SetEvent(ghSvcStopEvent);
+ zen::RequestApplicationExit(0);
+
+ ReportSvcStatus(gSvcStatus.dwCurrentState, NO_ERROR, 0);
+ return;
+
+ case SERVICE_CONTROL_INTERROGATE:
+ break;
+
+ default:
+ break;
+ }
+}
+
+//
+// Purpose:
+// Logs messages to the event log
+//
+// Parameters:
+// szFunction - name of function that failed
+//
+// Return value:
+// None
+//
+// Remarks:
+// The service must have an entry in the Application event log.
+//
+VOID
+SvcReportEvent(LPTSTR szFunction)
+{
+ ZEN_UNUSED(szFunction);
+
+ // HANDLE hEventSource;
+ // LPCTSTR lpszStrings[2];
+ // TCHAR Buffer[80];
+
+ // hEventSource = RegisterEventSource(NULL, SVCNAME);
+
+ // if (NULL != hEventSource)
+ //{
+ // StringCchPrintf(Buffer, 80, TEXT("%s failed with %d"), szFunction, GetLastError());
+
+ // lpszStrings[0] = SVCNAME;
+ // lpszStrings[1] = Buffer;
+
+ // ReportEvent(hEventSource, // event log handle
+ // EVENTLOG_ERROR_TYPE, // event type
+ // 0, // event category
+ // SVC_ERROR, // event identifier
+ // NULL, // no security identifier
+ // 2, // size of lpszStrings array
+ // 0, // no binary data
+ // lpszStrings, // array of strings
+ // NULL); // no binary data
+
+ // DeregisterEventSource(hEventSource);
+ //}
+}
diff --git a/zenserver/windows/service.h b/zenserver/windows/service.h
new file mode 100644
index 000000000..7c9610983
--- /dev/null
+++ b/zenserver/windows/service.h
@@ -0,0 +1,20 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+class WindowsService
+{
+public:
+ WindowsService();
+ ~WindowsService();
+
+ virtual int Run() = 0;
+
+ int ServiceMain();
+
+ static void Install();
+ static void Delete();
+
+ int SvcMain();
+ static void __stdcall SvcCtrlHandler(unsigned long);
+};
diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua
index bb70846fa..7a6981fcd 100644
--- a/zenserver/xmake.lua
+++ b/zenserver/xmake.lua
@@ -14,6 +14,8 @@ target("zenserver")
add_ldflags("/MANIFEST:EMBED")
add_ldflags("/MANIFESTUAC:level='requireAdministrator'")
add_ldflags("/LTCG")
+ else
+ del_files("windows/**")
end
add_options("vfs")
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 53dc41a24..cf24dc224 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -11,28 +11,35 @@
#include <zencore/timer.h>
#include <zencore/windows.h>
#include <zenhttp/httpserver.h>
-#include <zenserverprocess.h>
#include <zenstore/cas.h>
#include <zenstore/cidstore.h>
+#include <zenutil/zenserverprocess.h>
#include <fmt/format.h>
-#include <mimalloc-new-delete.h>
-#include <mimalloc.h>
+
+#if ZEN_USE_MIMALLOC
+# include <mimalloc-new-delete.h>
+# include <mimalloc.h>
+#endif
+
#include <asio.hpp>
#include <exception>
#include <list>
#include <lua.hpp>
#include <optional>
#include <regex>
+#include <set>
#include <unordered_map>
//////////////////////////////////////////////////////////////////////////
// We don't have any doctest code in this file but this is needed to bring
// in some shared code into the executable
-#define DOCTEST_CONFIG_IMPLEMENT
-#include <doctest/doctest.h>
-#undef DOCTEST_CONFIG_IMPLEMENT
+#if ZEN_WITH_TESTS
+# define DOCTEST_CONFIG_IMPLEMENT
+# include <zencore/testing.h>
+# undef DOCTEST_CONFIG_IMPLEMENT
+#endif
//////////////////////////////////////////////////////////////////////////
@@ -40,6 +47,10 @@
#include "config.h"
#include "diag/logging.h"
+#if ZEN_PLATFORM_WINDOWS
+# include "windows/service.h"
+#endif
+
//////////////////////////////////////////////////////////////////////////
// Sentry
//
@@ -82,11 +93,16 @@
#define ZEN_APP_NAME "Zen store"
+namespace zen {
+
class ZenServer
{
+ ZenServerState::ZenServerEntry* m_ServerEntry = nullptr;
+
public:
- void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid)
+ void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry)
{
+ m_ServerEntry = ServerEntry;
using namespace fmt::literals;
ZEN_INFO(ZEN_APP_NAME " initializing");
@@ -94,23 +110,29 @@ public:
if (ParentPid)
{
- m_Process.Initialize(ParentPid);
+ zen::ProcessHandle OwnerProcess;
+ OwnerProcess.Initialize(ParentPid);
- if (!m_Process.IsValid())
+ if (!OwnerProcess.IsValid())
{
ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid);
+
+ // If the pid is not reachable should we just shut down immediately? the intended owner process
+ // could have been killed or somehow crashed already
}
else
{
ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid);
}
+
+ m_ProcessMonitor.AddPid(ParentPid);
}
// Initialize/check mutex based on base port
std::string MutexName = "zen_{}"_format(BasePort);
- if (zen::NamedMutex::Exists(MutexName) || (m_ServerMutex.Create(MutexName) == false))
+ if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false)))
{
throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str());
}
@@ -151,11 +173,15 @@ public:
m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache");
std::unique_ptr<zen::UpstreamCache> UpstreamCache;
- if (ServiceConfig.UpstreamCacheConfig.Enabled)
+ if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled)
{
const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig;
zen::UpstreamCacheOptions UpstreamOptions;
+ UpstreamOptions.ReadUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0;
+ UpstreamOptions.WriteUpstream =
+ (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0;
if (UpstreamConfig.UpstreamThreadCount < 32)
{
@@ -201,7 +227,11 @@ public:
if (UpstreamCache->Initialize())
{
- ZEN_INFO("upstream cache active");
+ ZEN_INFO("upstream cache active ({})",
+ UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE"
+ : UpstreamOptions.ReadUpstream ? "READONLY"
+ : UpstreamOptions.WriteUpstream ? "WRITEONLY"
+ : "DISABLED");
}
else
{
@@ -267,7 +297,9 @@ public:
void Run()
{
- if (m_Process.IsValid())
+ Scrub();
+
+ if (m_ProcessMonitor.IsActive())
{
EnqueueTimer();
}
@@ -282,7 +314,7 @@ public:
ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ ");
}
- ZEN_INFO(ZEN_APP_NAME " now running");
+ ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId());
#if USE_SENTRY
sentry_clear_modulecache();
@@ -293,7 +325,9 @@ public:
__debugbreak();
}
- m_Http->Run(m_TestMode);
+ const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode;
+
+ m_Http->Run(IsInteractiveMode);
ZEN_INFO(ZEN_APP_NAME " exiting");
@@ -332,18 +366,51 @@ public:
void CheckOwnerPid()
{
- if (m_Process.IsRunning())
+ // Pick up any new "owner" processes
+
+ std::set<uint32_t> AddedPids;
+
+ for (auto& PidEntry : m_ServerEntry->SponsorPids)
+ {
+ if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed))
+ {
+ if (PidEntry.compare_exchange_strong(ThisPid, 0))
+ {
+ if (AddedPids.insert(ThisPid).second)
+ {
+ m_ProcessMonitor.AddPid(ThisPid);
+
+ ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid);
+ }
+ }
+ }
+ }
+
+ if (m_ProcessMonitor.IsRunning())
{
EnqueueTimer();
}
else
{
- ZEN_INFO(ZEN_APP_NAME " exiting since parent process id {} is gone", m_Process.Pid());
+ ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone");
RequestExit(0);
}
}
+ void Scrub()
+ {
+ ZEN_INFO("Storage validation STARTING");
+
+ ScrubContext Ctx;
+ m_CasStore->Scrub(Ctx);
+ m_CidStore->Scrub(Ctx);
+ m_ProjectStore->Scrub(Ctx);
+ m_StructuredCacheService->Scrub(Ctx);
+
+ ZEN_INFO("Storage validation DONE");
+ }
+
void Flush()
{
if (m_CasStore)
@@ -366,16 +433,16 @@ private:
std::jthread m_IoRunner;
asio::io_context m_IoContext;
asio::steady_timer m_PidCheckTimer{m_IoContext};
- zen::ProcessHandle m_Process;
+ zen::ProcessMonitor m_ProcessMonitor;
zen::NamedMutex m_ServerMutex;
zen::Ref<zen::HttpServer> m_Http;
std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()};
std::unique_ptr<zen::CidStore> m_CidStore;
- std::unique_ptr<ZenCacheStore> m_CacheStore;
+ std::unique_ptr<zen::ZenCacheStore> m_CacheStore;
zen::CasGc m_Gc{*m_CasStore};
zen::CasScrubber m_Scrubber{*m_CasStore};
- HttpTestService m_TestService;
+ zen::HttpTestService m_TestService;
zen::HttpTestingService m_TestingService;
zen::HttpCasService m_CasService{*m_CasStore};
zen::RefPtr<zen::ProjectStore> m_ProjectStore;
@@ -383,23 +450,39 @@ private:
std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService;
std::unique_ptr<zen::HttpProjectService> m_HttpProjectService;
std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService;
- HttpAdminService m_AdminService;
- HttpHealthService m_HealthService;
+ zen::HttpAdminService m_AdminService;
+ zen::HttpHealthService m_HealthService;
zen::Mesh m_ZenMesh{m_IoContext};
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
bool m_DebugOptionForcedCrash = false;
};
-int
-main(int argc, char* argv[])
+} // namespace zen
+
+class ZenWindowsService : public WindowsService
{
- mi_version();
+public:
+ ZenWindowsService(ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig)
+ : m_GlobalOptions(GlobalOptions)
+ , m_ServiceConfig(ServiceConfig)
+ {
+ }
- ZenServerOptions GlobalOptions;
- ZenServiceConfig ServiceConfig;
- ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig);
- InitializeLogging(GlobalOptions);
+ ZenWindowsService(const ZenWindowsService&) = delete;
+ ZenWindowsService& operator=(const ZenWindowsService&) = delete;
+
+ virtual int Run() override;
+
+private:
+ ZenServerOptions& m_GlobalOptions;
+ ZenServiceConfig& m_ServiceConfig;
+};
+
+int
+ZenWindowsService::Run()
+{
+ using namespace zen;
#if USE_SENTRY
// Initialize sentry.io client
@@ -408,30 +491,47 @@ main(int argc, char* argv[])
sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284");
sentry_init(SentryOptions);
- auto _ = zen::MakeGuard([&] { sentry_close(); });
+ auto _ = zen::MakeGuard([] { sentry_close(); });
#endif
- // Prototype config system, let's see how this pans out
-
- ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
-
- ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+ auto& GlobalOptions = m_GlobalOptions;
+ auto& ServiceConfig = m_ServiceConfig;
try
{
+ // Prototype config system, we'll see how this pans out
+ //
+ // TODO: we need to report any parse errors here
+
+ ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig);
+
+ ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort);
+
ZenServerState ServerState;
ServerState.Initialize();
ServerState.Sweep();
- if (ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort))
+ ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort);
+
+ if (Entry)
{
// Instance already running for this port? Should double check pid
ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid);
+
+ if (GlobalOptions.OwnerPid)
+ {
+ Entry->AddSponsorProcess(GlobalOptions.OwnerPid);
+
+ std::exit(0);
+ }
}
- else
+
+ Entry = ServerState.Register(GlobalOptions.BasePort);
+
+ if (GlobalOptions.OwnerPid)
{
- ServerState.Register(GlobalOptions.BasePort);
+ Entry->AddSponsorProcess(GlobalOptions.OwnerPid);
}
std::unique_ptr<std::thread> ShutdownThread;
@@ -445,15 +545,17 @@ main(int argc, char* argv[])
Server.SetDataRoot(GlobalOptions.DataDir);
Server.SetTestMode(GlobalOptions.IsTest);
Server.SetDedicatedMode(GlobalOptions.IsDedicated);
- Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid);
+ Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry);
// Monitor shutdown signals
ShutdownThread.reset(new std::thread{[&] {
ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName);
- ShutdownEvent->Wait();
- ZEN_INFO("shutdown signal received");
- Server.RequestExit(0);
+ if (ShutdownEvent->Wait())
+ {
+ ZEN_INFO("shutdown signal received");
+ Server.RequestExit(0);
+ }
}});
// If we have a parent process, establish the mechanisms we need
@@ -480,3 +582,37 @@ main(int argc, char* argv[])
return 0;
}
+
+int
+main(int argc, char* argv[])
+{
+ using namespace zen;
+
+#if ZEN_USE_MIMALLOC
+ mi_version();
+#endif
+
+ ZenServerOptions GlobalOptions;
+ ZenServiceConfig ServiceConfig;
+ ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig);
+ InitializeLogging(GlobalOptions);
+
+#if ZEN_PLATFORM_WINDOWS
+ if (GlobalOptions.InstallService)
+ {
+ WindowsService::Install();
+
+ std::exit(0);
+ }
+
+ if (GlobalOptions.UninstallService)
+ {
+ WindowsService::Delete();
+
+ std::exit(0);
+ }
+#endif
+
+ ZenWindowsService App(GlobalOptions, ServiceConfig);
+ return App.ServiceMain();
+}
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index aa9d538a5..db657d192 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -123,6 +123,7 @@
<ClInclude Include="upstream\upstreamcache.h" />
<ClInclude Include="upstream\zen.h" />
<ClInclude Include="vfs.h" />
+ <ClInclude Include="windows\service.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="cache\structuredcache.cpp" />
@@ -142,6 +143,7 @@
<ClCompile Include="upstream\upstreamcache.cpp" />
<ClCompile Include="upstream\zen.cpp" />
<ClCompile Include="vfs.cpp" />
+ <ClCompile Include="windows\service.cpp" />
<ClCompile Include="zenserver.cpp" />
</ItemGroup>
<ItemGroup>
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index a86a6d96d..250c55812 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -39,6 +39,7 @@
<Filter>upstream</Filter>
</ClInclude>
<ClInclude Include="testing\httptest.h" />
+ <ClInclude Include="windows\service.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -73,6 +74,7 @@
<Filter>upstream</Filter>
</ClCompile>
<ClCompile Include="testing\httptest.cpp" />
+ <ClCompile Include="windows\service.cpp" />
</ItemGroup>
<ItemGroup>
<Filter Include="cache">