aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ridgers <[email protected]>2021-09-22 22:46:21 +0200
committerMartin Ridgers <[email protected]>2021-09-22 22:46:21 +0200
commit1623f7471b9b0c09b7e9c98652c280d1f6559ca1 (patch)
tree78518774fe79ddf3e266a75c699b697971f52f4b
parentMerged main into linux-mac (diff)
parentClang format fix. (diff)
downloadzen-1623f7471b9b0c09b7e9c98652c280d1f6559ca1.tar.xz
zen-1623f7471b9b0c09b7e9c98652c280d1f6559ca1.zip
Merge main
-rw-r--r--README.md3
-rw-r--r--UnrealEngine.icobin0 -> 65288 bytes
-rw-r--r--xmake.lua1
-rw-r--r--zencore/mpscqueue.cpp2
-rw-r--r--zenhttp/httpsys.cpp4
-rw-r--r--zenserver/cache/structuredcache.cpp837
-rw-r--r--zenserver/cache/structuredcache.h9
-rw-r--r--zenserver/cache/structuredcachestore.cpp7
-rw-r--r--zenserver/cache/structuredcachestore.h1
-rw-r--r--zenserver/compute/apply.cpp9
-rw-r--r--zenserver/config.cpp21
-rw-r--r--zenserver/resource.h18
-rw-r--r--zenserver/upstream/jupiter.cpp30
-rw-r--r--zenserver/upstream/jupiter.h10
-rw-r--r--zenserver/upstream/upstreamcache.cpp206
-rw-r--r--zenserver/upstream/upstreamcache.h44
-rw-r--r--zenserver/upstream/zen.cpp25
-rw-r--r--zenserver/upstream/zen.h10
-rw-r--r--zenserver/zenserver.cpp4
-rw-r--r--zenserver/zenserver.rc81
-rw-r--r--zenserver/zenserver.vcxproj7
-rw-r--r--zenserver/zenserver.vcxproj.filters7
-rw-r--r--zenstore/CAS.cpp49
-rw-r--r--zenstore/cidstore.cpp57
-rw-r--r--zenstore/compactcas.cpp30
-rw-r--r--zenstore/compactcas.h1
-rw-r--r--zenstore/filecas.cpp126
-rw-r--r--zenstore/filecas.h20
-rw-r--r--zenstore/include/zenstore/CAS.h26
-rw-r--r--zenstore/include/zenstore/cidstore.h4
30 files changed, 1011 insertions, 638 deletions
diff --git a/README.md b/README.md
index 7d51d3686..055ecca3a 100644
--- a/README.md
+++ b/README.md
@@ -3,6 +3,9 @@
This is the implementation of the local storage service for UE5. It is intended to be deployed on
user machines either as a daemon or launched ad hoc as required during of editor/cooker/game startup
+Zen can also be deployed as a shared instance for use as a shared cache. It also supports upstream
+connectivity to cloud storage services as well as other Zen server instances.
+
We currently only support building and running the server on Windows. Linux and Mac support is in progress
## Building on Windows
diff --git a/UnrealEngine.ico b/UnrealEngine.ico
new file mode 100644
index 000000000..1cfa301a2
--- /dev/null
+++ b/UnrealEngine.ico
Binary files differ
diff --git a/xmake.lua b/xmake.lua
index 9e0b5fe2a..9d44ec8d0 100644
--- a/xmake.lua
+++ b/xmake.lua
@@ -43,6 +43,7 @@ end
if is_os("windows") then
add_defines("_CRT_SECURE_NO_WARNINGS", "_UNICODE", "UNICODE", "_WIN32_WINNT=0x0A00")
+ -- add_ldflags("/MAP")
end
add_defines("USE_SENTRY=1")
diff --git a/zencore/mpscqueue.cpp b/zencore/mpscqueue.cpp
index e1841ef63..29c76c3ca 100644
--- a/zencore/mpscqueue.cpp
+++ b/zencore/mpscqueue.cpp
@@ -7,7 +7,7 @@
namespace zen {
-#if ZEN_WITH_TESTS && 0
+#if ZEN_WITH_TESTS && 0
TEST_CASE("mpsc")
{
MpscQueue<std::string> Queue;
diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp
index 087d4c807..deaf95d5a 100644
--- a/zenhttp/httpsys.cpp
+++ b/zenhttp/httpsys.cpp
@@ -553,7 +553,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode)
CancelThreadpoolIo(Iocp);
- ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: {}", SendResult, HttpReq->pRawUrl);
+ ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: '{}'", GetWindowsErrorAsString(SendResult), HttpReq->pRawUrl);
ErrorCode = MakeErrorCode(SendResult);
}
@@ -1256,7 +1256,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT
case ERROR_OPERATION_ABORTED:
return nullptr;
- case ERROR_MORE_DATA: // Insufficient buffer space
+ case ERROR_MORE_DATA: // Insufficient buffer space
case NO_ERROR:
break;
}
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 7f1fe7b44..3ac1ec37f 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -173,10 +173,19 @@ HttpStructuredCacheService::Flush()
{
}
-void
+void
HttpStructuredCacheService::Scrub(ScrubContext& Ctx)
{
- ZEN_UNUSED(Ctx);
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
+ m_CasStore.Scrub(Ctx);
+ m_CidStore.Scrub(Ctx);
+ m_CacheStore.Scrub(Ctx);
}
void
@@ -244,7 +253,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request,
}
void
-HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
{
switch (auto Verb = Request.RequestVerb())
{
@@ -253,439 +262,380 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request,
case kHead:
case kGet:
{
- const ZenContentType AcceptType = Request.AcceptContentType();
+ HandleGetCacheRecord(Request, Ref, Policy);
+ if (Verb == kHead)
+ {
+ Request.SetSuppressResponseBody();
+ }
+ }
+ break;
+ case kPut:
+ HandlePutCacheRecord(Request, Ref, Policy);
+ break;
+ default:
+ break;
+ }
+}
- ZenCacheValue Value;
- bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
- bool InUpstreamCache = false;
+void
+HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ const ZenContentType AcceptType = Request.AcceptContentType();
- const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+ ZenCacheValue Value;
+ bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value);
+ bool InUpstreamCache = false;
- if (QueryUpstream)
- {
- const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
- : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
- : ZenContentType::kCbObject;
+ const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
- if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
- UpstreamResult.Success)
- {
- Value.Value = UpstreamResult.Value;
- Success = true;
- InUpstreamCache = true;
+ if (QueryUpstream)
+ {
+ const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary
+ : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage
+ : ZenContentType::kCbObject;
- if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
- {
- if (CacheRecordType == ZenContentType::kCbObject)
- {
- const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
-
- if (ValidationResult == CbValidateError::None)
- {
- CbObjectView CacheRecord(UpstreamResult.Value.Data());
-
- CbObjectWriter IndexData;
- IndexData.BeginArray("references");
- CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
- IndexData.EndArray();
-
- Value.IndexData = IndexData.Save();
- }
- else
- {
- Success = false;
- ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
+ if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType);
+ UpstreamResult.Success)
+ {
+ Value.Value = UpstreamResult.Value;
+ Success = true;
+ InUpstreamCache = true;
- if (Success)
- {
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
- }
- }
- else
- {
- ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
+ if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject)
+ {
+ if (CacheRecordType == ZenContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All);
- CbPackage Package;
- if (Package.TryLoad(UpstreamResult.Value))
- {
- uint32_t AttachmentCount = 0;
- uint32_t FoundCount = 0;
- CbObject CacheRecord = Package.GetObject();
-
- CacheRecord.IterateAttachments(
- [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
- {
- if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
- {
- m_CidStore.AddChunk(Chunk);
- FoundCount++;
- }
- else
- {
- ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
- AttachmentCount++;
- });
-
- if (FoundCount == AttachmentCount)
- {
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
-
- if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
- {
- CbPackage PackageWithoutAttachments;
- PackageWithoutAttachments.SetObject(CacheRecord);
-
- MemoryOutStream MemStream;
- BinaryWriter Writer(MemStream);
- PackageWithoutAttachments.Save(Writer);
-
- Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
- Ref.BucketSegment,
- Ref.HashKey);
- }
- }
- else
- {
- Success = false;
- ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
- }
- }
- }
- }
+ if (ValidationResult == CbValidateError::None)
+ {
+ CbObjectView CacheRecord(UpstreamResult.Value.Data());
- if (!Success)
- {
- ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
+ CbObjectWriter IndexData;
+ IndexData.BeginArray("references");
+ CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); });
+ IndexData.EndArray();
- return Request.WriteResponse(HttpResponseCode::NotFound);
+ Value.IndexData = IndexData.Save();
+ }
+ else
+ {
+ Success = false;
+ ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
}
- if (Verb == kHead)
+ if (Success)
{
- Request.SetSuppressResponseBody();
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value);
}
+ }
+ else
+ {
+ ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage);
- if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ CbPackage Package;
+ if (Package.TryLoad(UpstreamResult.Value))
{
- CbObjectView CacheRecord(Value.Value.Data());
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+ CbObject CacheRecord = Package.GetObject();
- const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
+ CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (CompressedBuffer Chunk = Attachment->AsCompressedBinary())
+ {
+ m_CidStore.AddChunk(Chunk);
+ ValidCount++;
+ }
+ else
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
+ }
+ AttachmentCount++;
+ });
- if (ValidationResult != CbValidateError::None)
+ if (ValidCount == AttachmentCount)
{
- ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
-
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
- }
-
- const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
- uint32_t AttachmentCount = 0;
- uint32_t FoundCount = 0;
- uint64_t AttachmentBytes = 0ull;
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- CbPackage Package;
-
- if (!SkipAttachments)
- {
- CacheRecord.IterateAttachments(
- [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
- {
- Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
- AttachmentBytes += Chunk.Size();
- FoundCount++;
- }
- AttachmentCount++;
- });
-
- if (FoundCount != AttachmentCount)
+ if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments))
{
- ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- FoundCount,
- AttachmentCount);
+ CbPackage PackageWithoutAttachments;
+ PackageWithoutAttachments.SetObject(CacheRecord);
+
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ PackageWithoutAttachments.Save(Writer);
- return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
}
}
-
- Package.SetObject(LoadCompactBinaryObject(Value.Value));
-
- ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(AttachmentBytes + Value.Value.Size()),
- AttachmentCount,
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- MemoryOutStream MemStream;
- BinaryWriter Writer(MemStream);
- Package.Save(Writer);
-
- IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
-
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ else
+ {
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package",
+ Ref.BucketSegment,
+ Ref.HashKey);
+ }
}
else
{
- ZEN_DEBUG("HIT - '{}/{}' {} ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Value.Value.Size()),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
- return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ Success = false;
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey);
}
}
- break;
+ }
+ }
- case kPut:
- {
- IoBuffer Body = Request.ReadPayload();
+ if (!Success)
+ {
+ ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey);
- if (!Body || Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
- const HttpContentType ContentType = Request.RequestContentType();
+ if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache)
+ {
+ CbObjectView CacheRecord(Value.Value.Data());
- const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
+ const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All);
- if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
- {
- // TODO: create a cache record and put value in CAS?
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv);
+ }
+
+ const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments);
+ uint32_t AttachmentCount = 0;
+ uint32_t ValidCount = 0;
+ uint64_t AttachmentBytes = 0ull;
- if (StoreUpstream)
+ CbPackage Package;
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments(
+ [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
{
- auto Result = m_UpstreamCache->EnqueueUpstream(
- {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ AttachmentBytes += Chunk.Size();
+ ValidCount++;
}
+ AttachmentCount++;
+ });
- return Request.WriteResponse(HttpResponseCode::Created);
- }
- else if (ContentType == HttpContentType::kCbObject)
- {
- // Validate payload before accessing it
- const CbValidateError ValidationResult =
- ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All);
+ if (ValidCount != AttachmentCount)
+ {
+ ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ValidCount,
+ AttachmentCount);
- if (ValidationResult != CbValidateError::None)
- {
- ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary",
- Ref.BucketSegment,
- Ref.HashKey,
- Body.Size());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv);
+ }
+ }
- // TODO: add details in response, kText || kCbObject?
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Compact binary validation failed"sv);
- }
+ Package.SetObject(LoadCompactBinaryObject(Value.Value));
- // Extract referenced payload hashes
- CbObjectView Cbo(Body.Data());
+ ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments (LOCAL)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(AttachmentBytes + Value.Value.Size()),
+ AttachmentCount);
- std::vector<IoHash> References;
- std::vector<IoHash> MissingRefs;
- Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); });
+ MemoryOutStream MemStream;
+ BinaryWriter Writer(MemStream);
+ Package.Save(Writer);
- ZenCacheValue CacheValue;
- CacheValue.Value = Body;
+ IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size());
- if (!References.empty())
- {
- CbObjectWriter Idx;
- Idx.BeginArray("references");
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response);
+ }
+ else
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Value.Value.Size()),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
- for (const IoHash& Hash : References)
- {
- Idx.AddHash(Hash);
- if (!m_CidStore.ContainsChunk(Hash))
- {
- MissingRefs.push_back(Hash);
- }
- }
+ Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value);
+ }
+}
- Idx.EndArray();
+void
+HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ IoBuffer Body = Request.ReadPayload();
- CacheValue.IndexData = Idx.Save();
- }
+ if (!Body || Body.Size() == 0)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ const HttpContentType ContentType = Request.RequestContentType();
+ const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote));
- ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(CacheValue.Value.Size()),
- MissingRefs.size(),
- References.size());
+ if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType)
+ {
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()));
- if (MissingRefs.empty() && StoreUpstream)
- {
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(References)});
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}});
+ }
- return Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- // TODO: Binary attachments?
- CbObjectWriter Response;
- Response.BeginArray("needs");
- for (const IoHash& MissingRef : MissingRefs)
- {
- Response.AddHash(MissingRef);
- ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef);
- }
- Response.EndArray();
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else if (ContentType == HttpContentType::kCbObject)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All);
- // Return Created | BadRequest?
- return Request.WriteResponse(HttpResponseCode::Created, Response.Save());
- }
- }
- else if (ContentType == HttpContentType::kCbPackage)
- {
- CbPackage Package;
+ if (ValidationResult != CbValidateError::None)
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, Body.Size());
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
+ }
- if (!Package.TryLoad(Body))
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package");
- }
+ CbObjectView CacheRecord(Body.Data());
+ std::vector<IoHash> ValidAttachments;
+ uint32_t AttachmentCount = 0;
- CbObject CacheRecord = Package.GetObject();
+ CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) {
+ const IoHash Hash = AttachmentHash.AsHash();
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ ValidAttachments.emplace_back(Hash);
+ }
+ AttachmentCount++;
+ });
- struct AttachmentInsertResult
- {
- int32_t Count = 0;
- int32_t NewCount = 0;
- uint64_t Bytes = 0;
- uint64_t NewBytes = 0;
- bool Ok = false;
- };
+ const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size());
+ const bool ValidCacheRecord = ValidCount == AttachmentCount;
- AttachmentInsertResult AttachmentResult{.Ok = true};
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- std::vector<IoHash> PayloadIds;
+ if (ValidCacheRecord)
+ {
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {} attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ValidCount);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- PayloadIds.reserve(Attachments.size());
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
+ }
- CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
- {
- if (Attachment->IsCompressedBinary())
- {
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- const uint64_t ChunkSize = Chunk.GetCompressed().GetSize();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, found {}/{} attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ ValidCount,
+ AttachmentCount);
- PayloadIds.emplace_back(InsertResult.DecompressedId);
+ Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv);
+ }
+ }
+ else if (ContentType == HttpContentType::kCbPackage)
+ {
+ CbPackage Package;
- if (InsertResult.New)
- {
- AttachmentResult.NewBytes += ChunkSize;
- AttachmentResult.NewCount++;
- }
+ if (!Package.TryLoad(Body))
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
+ }
- AttachmentResult.Bytes += ChunkSize;
- AttachmentResult.Count++;
- }
- else
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
- Ref.BucketSegment,
- Ref.HashKey,
- AttachmentHash.AsHash());
- AttachmentResult.Ok = false;
- }
- }
- else
- {
- ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- AttachmentHash.AsHash());
- AttachmentResult.Ok = false;
- }
- });
+ CbObject CacheRecord = Package.GetObject();
- if (!AttachmentResult.Ok)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments");
- }
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ std::vector<IoHash> ValidAttachments;
+ int32_t NewAttachmentCount = 0;
- IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer();
- const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size();
+ ValidAttachments.reserve(Attachments.size());
- ZenCacheValue CacheValue{.Value = CacheRecordChunk};
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue);
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) {
+ if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (Attachment->IsCompressedBinary())
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+
+ ValidAttachments.emplace_back(InsertResult.DecompressedId);
- if (StoreUpstream)
+ if (InsertResult.New)
{
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(PayloadIds)});
+ NewAttachmentCount++;
}
-
- ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(TotalPackageBytes),
- AttachmentResult.NewCount,
- AttachmentResult.Count,
- NiceBytes(AttachmentResult.NewBytes),
- NiceBytes(AttachmentResult.Bytes));
-
- return Request.WriteResponse(HttpResponseCode::Created);
}
else
{
- return Request.WriteResponse(HttpResponseCode::BadRequest);
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
}
}
- break;
+ else
+ {
+ ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ AttachmentHash.AsHash());
+ }
+ });
- case kPost:
- break;
+ const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
- default:
- break;
+ if (!AttachmentsValid)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ }
+
+ ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} new attachments",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.GetSize()),
+ NewAttachmentCount,
+ Attachments.size());
+
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
+
+ if (StoreUpstream)
+ {
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
+ }
+
+ Request.WriteResponse(HttpResponseCode::Created);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest);
}
}
void
-HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy)
+HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
{
- // Note: the URL references the uncompressed payload hash - so this maintains the mapping
- // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash
- //
- // this is a PITA but a consequence of the fact that the client side code is not able to
- // address data by compressed hash
-
- ZEN_UNUSED(Policy);
-
switch (auto Verb = Request.RequestVerb())
{
using enum HttpVerb;
@@ -693,112 +643,107 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request
case kHead:
case kGet:
{
- IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
- bool InUpstreamCache = false;
-
- if (!Payload && m_UpstreamCache)
- {
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
- UpstreamResult.Success)
- {
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
- {
- Payload = UpstreamResult.Value;
- IoHash ChunkHash = IoHash::HashBuffer(Payload);
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
- InUpstreamCache = true;
-
- m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
- }
- else
- {
- ZEN_WARN("got uncompressed upstream cache payload");
- }
- }
- }
-
- if (!Payload)
- {
- ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
- return Request.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.PayloadId,
- NiceBytes(Payload.Size()),
- Payload.GetContentType(),
- InUpstreamCache ? "UPSTREAM" : "LOCAL");
-
+ HandleGetCachePayload(Request, Ref, Policy);
if (Verb == kHead)
{
Request.SetSuppressResponseBody();
}
-
- return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
}
break;
-
case kPut:
+ HandlePutCachePayload(Request, Ref, Policy);
+ break;
+ default:
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId);
+ bool InUpstreamCache = false;
+ const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote));
+
+ if (QueryUpstream)
+ {
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
+ UpstreamResult.Success)
+ {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
- if (IoBuffer Body = Request.ReadPayload())
- {
- if (Body.Size() == 0)
- {
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted");
- }
+ Payload = UpstreamResult.Value;
+ IoHash ChunkHash = IoHash::HashBuffer(Payload);
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash);
+ InUpstreamCache = true;
- IoHash ChunkHash = IoHash::HashBuffer(Body);
+ m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ }
+ else
+ {
+ ZEN_WARN("got uncompressed upstream cache payload");
+ }
+ }
+ }
+
+ if (!Payload)
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId);
+ return Request.WriteResponse(HttpResponseCode::NotFound);
+ }
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ NiceBytes(Payload.Size()),
+ Payload.GetContentType(),
+ InUpstreamCache ? "UPSTREAM" : "LOCAL");
- if (!Compressed)
- {
- // All attachment payloads need to be in compressed buffer format
- return Request.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "Attachments must be compressed");
- }
- else
- {
- if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
- {
- // the URL specified content id and content hashes don't match!
- return Request.WriteResponse(HttpResponseCode::BadRequest);
- }
+ Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload);
+}
- CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
+void
+HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy)
+{
+ // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored
+ ZEN_UNUSED(Policy);
- m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+ IoBuffer Body = Request.ReadPayload();
- ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}",
- Ref.BucketSegment,
- Ref.HashKey,
- Ref.PayloadId,
- NiceBytes(Body.Size()),
- Body.GetContentType(),
- Result.New ? "NEW" : "OLD");
+ if (!Body || Body.Size() == 0)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
- if (Result.New)
- {
- return Request.WriteResponse(HttpResponseCode::Created);
- }
- else
- {
- return Request.WriteResponse(HttpResponseCode::OK);
- }
- }
- }
- }
- break;
+ IoHash ChunkHash = IoHash::HashBuffer(Body);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body));
- case kPost:
- break;
+ if (!Compressed)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv);
+ }
- default:
- break;
+ if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv);
}
+
+ CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash);
+
+ m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash);
+
+ ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ Ref.PayloadId,
+ NiceBytes(Body.Size()),
+ Body.GetContentType(),
+ Result.New ? "NEW" : "OLD");
+
+ const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK;
+
+ Request.WriteResponse(ResponseCode);
}
bool
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index bd163dd1d..3fdaa1236 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -71,8 +71,12 @@ private:
};
[[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef);
- void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
- void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy);
+ void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
spdlog::logger& Log() { return m_Log; }
@@ -81,6 +85,7 @@ private:
zen::CasStore& m_CasStore;
zen::CidStore& m_CidStore;
std::unique_ptr<UpstreamCache> m_UpstreamCache;
+ uint64_t m_LastScrubTime = 0;
};
} // namespace zen
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index 502ca6605..3d80bb14c 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -108,6 +108,13 @@ ZenCacheStore::Flush()
void
ZenCacheStore::Scrub(ScrubContext& Ctx)
{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
m_DiskLayer.Scrub(Ctx);
m_MemLayer.Scrub(Ctx);
}
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index fdf4a8cfe..2cc3abb53 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -112,6 +112,7 @@ private:
ZenCacheMemoryLayer m_MemLayer;
ZenCacheDiskLayer m_DiskLayer;
uint64_t m_DiskLayerSizeThreshold = 4 * 1024;
+ uint64_t m_LastScrubTime = 0;
};
/** Tracks cache entry access, stats and orchestrates cleanup activities
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 3197eaee4..15d9e0141 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -375,7 +375,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
FunctionSpec.IterateAttachments([&](CbFieldView Field) {
const IoHash Hash = Field.AsHash();
- ChunkSet.AddChunk(Hash);
+ ChunkSet.AddChunkToSet(Hash);
});
// Note that we store executables uncompressed to make it
@@ -399,16 +399,15 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
CbObjectWriter ResponseWriter;
ResponseWriter.BeginArray("need");
- for (const IoHash& Hash : ChunkSet.GetChunkSet())
- {
+ ChunkSet.IterateChunks([&](const IoHash& Hash) {
ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
ResponseWriter.AddHash(Hash);
- }
+ });
ResponseWriter.EndArray();
- ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size());
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
}
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index 164d2a792..c21638258 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -17,6 +17,8 @@
#include <zencore/logging.h>
#include <sol/sol.hpp>
+#include <conio.h>
+
#if ZEN_PLATFORM_WINDOWS
// Used for getting My Documents for default data directory
@@ -219,7 +221,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
if (result.count("help"))
{
zen::logging::ConsoleLog().info("{}", options.help());
-
+ zen::logging::ConsoleLog().info("Press any key to exit!");
+ _getch();
exit(0);
}
@@ -274,14 +277,24 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv
try
{
- sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zencfg");
+ sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zen_cfg");
+
+ if (!config.valid())
+ {
+ sol::error err = config;
+
+ std::string ErrorString = sol::to_string(config.status());
+
+ throw std::runtime_error("{} error: {}"_format(ErrorString, err.what()));
+ }
+
config();
}
catch (std::exception& e)
{
- ZEN_ERROR("config script failure: {}", e.what());
+ ZEN_ERROR("config failure: {}", e.what());
- throw std::runtime_error("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str());
+ throw std::runtime_error("failed to run global config script ('{}'): {}"_format(ConfigScript, e.what()).c_str());
}
ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled);
diff --git a/zenserver/resource.h b/zenserver/resource.h
new file mode 100644
index 000000000..f2e3b471b
--- /dev/null
+++ b/zenserver/resource.h
@@ -0,0 +1,18 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+//{{NO_DEPENDENCIES}}
+// Microsoft Visual C++ generated include file.
+// Used by zenserver.rc
+//
+#define IDI_ICON1 101
+
+// Next default values for new objects
+//
+#ifdef APSTUDIO_INVOKED
+# ifndef APSTUDIO_READONLY_SYMBOLS
+# define _APS_NEXT_RESOURCE_VALUE 102
+# define _APS_NEXT_COMMAND_VALUE 40001
+# define _APS_NEXT_CONTROL_VALUE 1001
+# define _APS_NEXT_SYMED_VALUE 101
+# endif
+#endif
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 2e74602db..14da8cbcc 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -90,6 +90,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -121,6 +126,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -144,6 +154,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -171,6 +186,11 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -204,6 +224,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -227,6 +252,11 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 21217387c..94e7e7680 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -42,10 +42,12 @@ private:
struct CloudCacheResult
{
- IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
/**
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index d6b6d44be..a889fb984 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -23,7 +23,6 @@
#include <algorithm>
#include <atomic>
#include <deque>
-#include <limits>
#include <thread>
#include <unordered_map>
@@ -106,11 +105,23 @@ namespace detail {
virtual ~JupiterUpstreamEndpoint() = default;
- virtual bool Initialize() override
+ virtual bool IsHealthy() const override { return m_HealthOk.load(); }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
{
- CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.Authenticate();
- return Result.Success;
+ try
+ {
+ CloudCacheSession Session(m_Client);
+ const CloudCacheResult Result = Session.Authenticate();
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Reason = Err.what(), .Ok = false};
+ }
}
virtual std::string_view DisplayName() const override { return m_DisplayName; }
@@ -143,6 +154,7 @@ namespace detail {
CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) {
CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds;
+ Result.ErrorCode = AttachmentResult.ErrorCode;
if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response)))
{
@@ -169,14 +181,16 @@ namespace detail {
}
}
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -187,14 +201,16 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -277,6 +293,7 @@ namespace detail {
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
+ std::atomic_bool m_HealthOk{false};
};
class ZenUpstreamEndpoint final : public UpstreamEndpoint
@@ -285,27 +302,33 @@ namespace detail {
ZenUpstreamEndpoint(std::string_view ServiceUrl)
{
using namespace fmt::literals;
- m_DisplayName = "Zen - '{}'"_format(ServiceUrl);
+ m_DisplayName = "Zen - {}"_format(ServiceUrl);
m_Client = new ZenStructuredCacheClient(ServiceUrl);
}
~ZenUpstreamEndpoint() = default;
- virtual bool Initialize() override
+ virtual bool IsHealthy() const override { return m_HealthOk; }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
{
try
{
ZenStructuredCacheSession Session(*m_Client);
ZenCacheResult Result;
+
for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt)
{
Result = Session.SayHello();
}
- return Result.Success;
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk};
}
- catch (std::exception&)
+ catch (std::exception& Err)
{
- return false;
+ return {.Reason = Err.what(), .Ok = false};
}
}
@@ -318,14 +341,16 @@ namespace detail {
ZenStructuredCacheSession Session(*m_Client);
const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -337,14 +362,16 @@ namespace detail {
const ZenCacheResult Result =
Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ m_HealthOk = Result.ErrorCode == 0;
+
return {.Value = Result.Response,
.Bytes = Result.Bytes,
.ElapsedSeconds = Result.ElapsedSeconds,
.Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}};
}
}
@@ -390,6 +417,8 @@ namespace detail {
CacheRecord.CacheKey.Hash,
PackagePayload,
CacheRecord.Type);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes = Result.Bytes;
@@ -406,6 +435,8 @@ namespace detail {
CacheRecord.CacheKey.Hash,
CacheRecord.PayloadIds[Idx],
Payloads[Idx]);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes += Result.Bytes;
@@ -425,6 +456,8 @@ namespace detail {
{
Result =
Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type);
+
+ m_HealthOk = Result.ErrorCode == 0;
}
TotalBytes += Result.Bytes;
@@ -435,6 +468,7 @@ namespace detail {
}
catch (std::exception& e)
{
+ m_HealthOk = false;
return {.Reason = std::string(e.what()), .Success = false};
}
}
@@ -442,6 +476,7 @@ namespace detail {
private:
std::string m_DisplayName;
RefPtr<ZenStructuredCacheClient> m_Client;
+ std::atomic_bool m_HealthOk{false};
};
} // namespace detail
@@ -548,27 +583,35 @@ public:
virtual bool Initialize() override
{
- auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) {
- const bool Ok = Endpoint->Initialize();
- ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED");
- return !Ok;
- });
+ for (auto& Endpoint : m_Endpoints)
+ {
+ const UpstreamEndpointHealth Health = Endpoint->CheckHealth();
+ if (Health.Ok)
+ {
+ ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ }
+ else
+ {
+ ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
- m_Endpoints.erase(NewEnd, std::end(m_Endpoints));
- m_IsRunning = !m_Endpoints.empty();
+ m_RunState.IsRunning = !m_Endpoints.empty();
- if (m_IsRunning)
+ if (m_RunState.IsRunning)
{
for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this);
}
+
+ m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this);
}
- return m_IsRunning;
+ return m_RunState.IsRunning;
}
- virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
{
@@ -576,10 +619,13 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
}
@@ -593,10 +639,13 @@ public:
{
for (auto& Endpoint : m_Endpoints)
{
- if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- return Result;
+ if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ return Result;
+ }
}
}
}
@@ -606,7 +655,7 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override
{
- if (m_IsRunning.load() && m_Options.WriteUpstream)
+ if (m_RunState.IsRunning && m_Options.WriteUpstream)
{
if (!m_UpstreamThreads.empty())
{
@@ -655,18 +704,21 @@ private:
for (auto& Endpoint : m_Endpoints)
{
- const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
- if (Result.Success)
+ if (Endpoint->IsHealthy())
{
- m_Stats.Add(*Endpoint, Result);
- }
- else
- {
- ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- Endpoint->DisplayName(),
- Result.Reason);
+ const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
+ if (Result.Success)
+ {
+ m_Stats.Add(*Endpoint, Result);
+ }
+ else
+ {
+ ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->DisplayName(),
+ Result.Reason);
+ }
}
}
}
@@ -688,33 +740,82 @@ private:
}
}
- if (!m_IsRunning.load())
+ if (!m_RunState.IsRunning)
{
break;
}
}
}
+ void MonitorEndpoints()
+ {
+ for (;;)
+ {
+ {
+ std::unique_lock lk(m_RunState.Mutex);
+ if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
+ {
+ break;
+ }
+ }
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (!Endpoint->IsHealthy())
+ {
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ }
+ else
+ {
+ ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+ }
+ }
+ }
+
void Shutdown()
{
- if (m_IsRunning.load())
+ if (m_RunState.Stop())
{
- m_IsRunning.store(false);
m_UpstreamQueue.CompleteAdding();
-
for (std::thread& Thread : m_UpstreamThreads)
{
Thread.join();
}
+ m_EndpointMonitorThread.join();
m_UpstreamThreads.clear();
m_Endpoints.clear();
}
}
+ spdlog::logger& Log() { return m_Log; }
+
using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>;
- spdlog::logger& Log() { return m_Log; }
+ struct RunState
+ {
+ std::mutex Mutex;
+ std::condition_variable ExitSignal;
+ std::atomic_bool IsRunning{false};
+
+ bool Stop()
+ {
+ bool Stopped = false;
+ {
+ std::lock_guard _(Mutex);
+ Stopped = IsRunning.exchange(false);
+ }
+ if (Stopped)
+ {
+ ExitSignal.notify_all();
+ }
+ return Stopped;
+ }
+ };
spdlog::logger& m_Log;
UpstreamCacheOptions m_Options;
@@ -724,7 +825,8 @@ private:
UpstreamStats m_Stats;
std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints;
std::vector<std::thread> m_UpstreamThreads;
- std::atomic_bool m_IsRunning{false};
+ std::thread m_EndpointMonitorThread;
+ RunState m_RunState;
};
//////////////////////////////////////////////////////////////////////////
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index 142fe260f..96ee8bddc 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -6,6 +6,7 @@
#include <zencore/iohash.h>
#include <zencore/zencore.h>
+#include <chrono>
#include <memory>
namespace zen {
@@ -35,18 +36,33 @@ struct UpstreamCacheRecord
struct UpstreamCacheOptions
{
- uint32_t ThreadCount = 4;
- bool ReadUpstream = true;
- bool WriteUpstream = true;
+ std::chrono::seconds HealthCheckInterval{5};
+ uint32_t ThreadCount = 4;
+ bool ReadUpstream = true;
+ bool WriteUpstream = true;
+};
+
+enum class UpstreamStatusCode : uint8_t
+{
+ Ok,
+ Error
+};
+
+struct UpstreamError
+{
+ UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok;
+ std::string Reason;
+
+ explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; }
};
struct GetUpstreamCacheResult
{
- IoBuffer Value;
- std::string Reason;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Value;
+ UpstreamError Error;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ bool Success = false;
};
struct PutUpstreamCacheResult
@@ -57,6 +73,12 @@ struct PutUpstreamCacheResult
bool Success = false;
};
+struct UpstreamEndpointHealth
+{
+ std::string Reason;
+ bool Ok = false;
+};
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -65,7 +87,9 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
- virtual bool Initialize() = 0;
+ virtual bool IsHealthy() const = 0;
+
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
virtual std::string_view DisplayName() const = 0;
@@ -88,7 +112,7 @@ public:
virtual bool Initialize() = 0;
- virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 7f689d7f3..710d381c6 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -391,6 +391,11 @@ ZenStructuredCacheSession::SayHello()
Session.SetOption(cpr::Url{Uri.c_str()});
cpr::Response Response = Session.Get();
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}
@@ -411,6 +416,11 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -431,6 +441,11 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -455,6 +470,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
@@ -475,6 +495,11 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
cpr::Response Response = Session.Put();
ZEN_DEBUG("PUT {}", Response);
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
return {.Bytes = Response.uploaded_bytes,
.ElapsedSeconds = Response.elapsed,
.Success = (Response.status_code == 200 || Response.status_code == 201)};
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 36cfd1217..48886096d 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -91,10 +91,12 @@ namespace detail {
struct ZenCacheResult
{
- IoBuffer Response;
- int64_t Bytes = {};
- double ElapsedSeconds = {};
- bool Success = false;
+ IoBuffer Response;
+ int64_t Bytes = {};
+ double ElapsedSeconds = {};
+ int32_t ErrorCode = {};
+ std::string Reason;
+ bool Success = false;
};
/** Zen Structured Cache session
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index cf24dc224..e3b61568f 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -193,7 +193,7 @@ public:
if (!UpstreamConfig.ZenConfig.Url.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url);
- UpstreamCache->AddEndpoint(std::move(ZenEndpoint));
+ UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
}
{
@@ -221,7 +221,7 @@ public:
if (!Options.ServiceUrl.empty())
{
std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options);
- UpstreamCache->AddEndpoint(std::move(JupiterEndpoint));
+ UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint));
}
}
diff --git a/zenserver/zenserver.rc b/zenserver/zenserver.rc
new file mode 100644
index 000000000..c063436ef
--- /dev/null
+++ b/zenserver/zenserver.rc
@@ -0,0 +1,81 @@
+// Microsoft Visual C++ generated resource script.
+//
+#include "resource.h"
+
+#define APSTUDIO_READONLY_SYMBOLS
+/////////////////////////////////////////////////////////////////////////////
+//
+// Generated from the TEXTINCLUDE 2 resource.
+//
+#include "winres.h"
+
+/////////////////////////////////////////////////////////////////////////////
+#undef APSTUDIO_READONLY_SYMBOLS
+
+/////////////////////////////////////////////////////////////////////////////
+// English (United States) resources
+
+#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU)
+LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US
+#pragma code_page(1252)
+
+/////////////////////////////////////////////////////////////////////////////
+//
+// Icon
+//
+
+// Icon with lowest ID value placed first to ensure application icon
+// remains consistent on all systems.
+IDI_ICON1 ICON "..\\UnrealEngine.ico"
+
+#endif // English (United States) resources
+/////////////////////////////////////////////////////////////////////////////
+
+
+/////////////////////////////////////////////////////////////////////////////
+// English (United Kingdom) resources
+
+#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENG)
+LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_UK
+#pragma code_page(1252)
+
+#ifdef APSTUDIO_INVOKED
+/////////////////////////////////////////////////////////////////////////////
+//
+// TEXTINCLUDE
+//
+
+1 TEXTINCLUDE
+BEGIN
+ "resource.h\0"
+END
+
+2 TEXTINCLUDE
+BEGIN
+ "#include ""winres.h""\r\n"
+ "\0"
+END
+
+3 TEXTINCLUDE
+BEGIN
+ "\r\n"
+ "\0"
+END
+
+#endif // APSTUDIO_INVOKED
+
+#endif // English (United Kingdom) resources
+/////////////////////////////////////////////////////////////////////////////
+
+
+
+#ifndef APSTUDIO_INVOKED
+/////////////////////////////////////////////////////////////////////////////
+//
+// Generated from the TEXTINCLUDE 3 resource.
+//
+
+
+/////////////////////////////////////////////////////////////////////////////
+#endif // not APSTUDIO_INVOKED
+
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index db657d192..1671d98a6 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -109,6 +109,7 @@
<ClInclude Include="compute\apply.h" />
<ClInclude Include="config.h" />
<ClInclude Include="diag\logging.h" />
+ <ClInclude Include="resource.h" />
<ClInclude Include="sos\sos.h" />
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="upstream\jupiter.h" />
@@ -163,6 +164,12 @@
<ItemGroup>
<None Include="xmake.lua" />
</ItemGroup>
+ <ItemGroup>
+ <ResourceCompile Include="zenserver.rc" />
+ </ItemGroup>
+ <ItemGroup>
+ <Image Include="..\UnrealEngine.ico" />
+ </ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 250c55812..c51a8eb76 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -40,6 +40,7 @@
</ClInclude>
<ClInclude Include="testing\httptest.h" />
<ClInclude Include="windows\service.h" />
+ <ClInclude Include="resource.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="zenserver.cpp" />
@@ -96,4 +97,10 @@
<ItemGroup>
<None Include="xmake.lua" />
</ItemGroup>
+ <ItemGroup>
+ <ResourceCompile Include="zenserver.rc" />
+ </ItemGroup>
+ <ItemGroup>
+ <Image Include="..\UnrealEngine.ico" />
+ </ItemGroup>
</Project> \ No newline at end of file
diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp
index 916e7f709..eaf72cb41 100644
--- a/zenstore/CAS.cpp
+++ b/zenstore/CAS.cpp
@@ -26,6 +26,39 @@
namespace zen {
void
+CasChunkSet::AddChunkToSet(const IoHash& HashToAdd)
+{
+ m_ChunkSet.insert(HashToAdd);
+}
+
+void
+CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
+ {
+ if (Predicate(*It))
+ {
+ It = m_ChunkSet.erase(It);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
+CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback)
+{
+ for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;)
+ {
+ Callback(*It);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks)
{
ZEN_UNUSED(BadChunks);
@@ -111,9 +144,6 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig)
m_TinyStrategy.Initialize("tobs", 16, IsNewStore);
m_SmallStrategy.Initialize("sobs", 4096, IsNewStore);
-
- ScrubContext Ctx;
- Scrub(Ctx);
}
CasStore::InsertResult
@@ -176,6 +206,13 @@ CasImpl::Flush()
void
CasImpl::Scrub(ScrubContext& Ctx)
{
+ if (m_LastScrubTime == Ctx.ScrubTimestamp())
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
m_SmallStrategy.Scrub(Ctx);
m_TinyStrategy.Scrub(Ctx);
m_LargeStrategy.Scrub(Ctx);
@@ -222,11 +259,11 @@ TEST_CASE("CasStore")
CHECK(Result2.New);
CasChunkSet ChunkSet;
- ChunkSet.AddChunk(Hash1);
- ChunkSet.AddChunk(Hash2);
+ ChunkSet.AddChunkToSet(Hash1);
+ ChunkSet.AddChunkToSet(Hash2);
Store->FilterChunks(ChunkSet);
- CHECK(ChunkSet.GetChunkSet().size() == 0);
+ CHECK(ChunkSet.IsEmpty());
IoBuffer Lookup1 = Store->FindChunk(Hash1);
CHECK(Lookup1);
diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp
index 5e266f9d3..08a3192ff 100644
--- a/zenstore/cidstore.cpp
+++ b/zenstore/cidstore.cpp
@@ -12,9 +12,9 @@
namespace zen {
-struct CidStore::CidState
+struct CidStore::Impl
{
- CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {}
+ Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {}
struct IndexEntry
{
@@ -42,18 +42,26 @@ struct CidStore::CidState
void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed)
{
+ ZEN_ASSERT(Compressed != IoHash::Zero);
+
RwLock::ExclusiveLockScope _(m_Lock);
m_CidMap.insert_or_assign(DecompressedId, Compressed);
// TODO: it's pretty wasteful to log even idempotent updates
// however we can't simply use the boolean returned by insert_or_assign
// since there's not a 1:1 mapping between compressed and uncompressed
// so if we want a last-write-wins policy then we have to log each update
+ LogMapping(DecompressedId, Compressed);
+ }
+
+ void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed)
+ {
m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed});
}
IoBuffer FindChunkByCid(const IoHash& DecompressedId)
{
IoHash CompressedHash;
+
{
RwLock::SharedLockScope _(m_Lock);
if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end())
@@ -62,12 +70,9 @@ struct CidStore::CidState
}
}
- if (CompressedHash != IoHash::Zero)
- {
- return m_CasStore.FindChunk(CompressedHash);
- }
+ ZEN_ASSERT(CompressedHash != IoHash::Zero);
- return IoBuffer();
+ return m_CasStore.FindChunk(CompressedHash);
}
bool ContainsChunk(const IoHash& DecompressedId)
@@ -75,7 +80,17 @@ struct CidStore::CidState
RwLock::SharedLockScope _(m_Lock);
// Note that we do not check CAS here. This is optimistic but usually
// what we want.
- return m_CidMap.find(DecompressedId) != m_CidMap.end();
+ auto It = m_CidMap.find(DecompressedId);
+
+ if (It == m_CidMap.end())
+ {
+ // Not in map, or tombstone
+ return false;
+ }
+
+ ZEN_ASSERT(It->second != IoHash::Zero);
+
+ return true;
}
void InitializeIndex(const std::filesystem::path& RootDir)
@@ -87,6 +102,8 @@ struct CidStore::CidState
m_LogFile.Open(SlogPath, IsNew);
+ uint64_t TombstoneCount = 0;
+
m_LogFile.Replay([&](const IndexEntry& Ie) {
if (Ie.Compressed != IoHash::Zero)
{
@@ -97,16 +114,24 @@ struct CidStore::CidState
{
// Tombstone
m_CidMap.erase(Ie.Uncompressed);
+ ++TombstoneCount;
}
});
- ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size());
+ ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount);
}
void Flush() { m_LogFile.Flush(); }
void Scrub(ScrubContext& Ctx)
{
+ if (Ctx.ScrubTimestamp() == m_LastScrubTime)
+ {
+ return;
+ }
+
+ m_LastScrubTime = Ctx.ScrubTimestamp();
+
CasChunkSet ChunkSet;
{
@@ -114,7 +139,7 @@ struct CidStore::CidState
for (auto& Kv : m_CidMap)
{
- ChunkSet.AddChunk(Kv.second);
+ ChunkSet.AddChunkToSet(Kv.second);
}
}
@@ -126,20 +151,22 @@ struct CidStore::CidState
return;
}
- ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size());
+ ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed",
+ ChunkSet.GetSize(),
+ m_CidMap.size());
// Erase all mappings to chunks which are not present in the underlying CAS store
// we do this by removing mappings from the in-memory lookup structure and also
// by emitting tombstone records to the commit log
- const auto& MissingChunks = ChunkSet.GetChunkSet();
std::vector<IoHash> BadChunks;
+
{
RwLock::SharedLockScope _(m_Lock);
for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;)
{
- if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end())
+ if (ChunkSet.ContainsChunk(It->second))
{
const IoHash& BadHash = It->first;
@@ -163,11 +190,13 @@ struct CidStore::CidState
Ctx.ReportBadChunks(BadChunks);
}
+
+ uint64_t m_LastScrubTime = 0;
};
//////////////////////////////////////////////////////////////////////////
-CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<CidState>(InCasStore))
+CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore))
{
m_Impl->InitializeIndex(RootDir);
}
diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp
index fe38f0fde..5fc3ac356 100644
--- a/zenstore/compactcas.cpp
+++ b/zenstore/compactcas.cpp
@@ -5,6 +5,7 @@
#include "CompactCas.h"
#include <zencore/except.h>
+#include <zencore/logging.h>
#include <zencore/memory.h>
#include <zencore/string.h>
#include <zencore/thread.h>
@@ -32,7 +33,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6
ZEN_ASSERT(IsPow2(Alignment));
ZEN_ASSERT(!m_IsInitialized);
- m_PayloadAlignment = Alignment;
+ m_ContainerBaseName = ContainerBaseName;
+ m_PayloadAlignment = Alignment;
+
std::string BaseName(ContainerBaseName);
std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas");
std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx");
@@ -144,20 +147,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks)
// we're likely to already have a large proportion of the
// chunks in the set
- std::unordered_set<IoHash> HaveSet;
-
- for (const IoHash& Hash : InOutChunks.GetChunkSet())
- {
- if (HaveChunk(Hash))
- {
- HaveSet.insert(Hash);
- }
- }
-
- for (const IoHash& Hash : HaveSet)
- {
- InOutChunks.RemoveIfPresent(Hash);
- }
+ InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -210,7 +200,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
continue;
}
- const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size);
+ const IoHash ComputedHash =
+ IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size);
if (Entry.first != ComputedHash)
{
@@ -242,6 +233,13 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx)
}
}
+ if (BadChunks.empty())
+ {
+ return;
+ }
+
+ ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName);
+
// Deal with bad chunks by removing them from our lookup map
std::vector<IoHash> BadChunkHashes;
diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h
index 101e6b1b7..a512c3d93 100644
--- a/zenstore/compactcas.h
+++ b/zenstore/compactcas.h
@@ -69,6 +69,7 @@ private:
BasicFile m_SmallObjectFile;
BasicFile m_SmallObjectIndex;
TCasLogFile<CasDiskIndexEntry> m_CasLog;
+ std::string m_ContainerBaseName;
RwLock m_LocationMapLock;
std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap;
diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp
index 968c9f3a0..c036efd35 100644
--- a/zenstore/filecas.cpp
+++ b/zenstore/filecas.cpp
@@ -31,18 +31,12 @@ namespace zen {
using namespace fmt::literals;
-FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config)
+FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash)
{
-}
-
-FileCasStrategy::~FileCasStrategy()
-{
-}
+ ShardedPath.Append(RootPath.c_str());
+ ShardedPath.Append(std::filesystem::path::preferred_separator);
-WideStringBuilderBase&
-FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len)
-{
- ExtendableStringBuilder<96> HashString;
+ ExtendableStringBuilder<64> HashString;
ChunkHash.ToHexString(HashString);
const char* str = HashString.c_str();
@@ -53,20 +47,31 @@ FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHas
// This results in a maximum of 4096 * 256 directories
//
// The numbers have been chosen somewhat arbitrarily but are large to scale
- // to very large chunk repositories. It may or may not make sense to make
- // this a configurable policy, and it would probably be a good idea to
- // measure performance for different policies and chunk counts
+ // to very large chunk repositories without creating too many directories
+ // on a single level since NTFS does not deal very well with this.
+ //
+ // It may or may not make sense to make this a configurable policy, and it
+ // would probably be a good idea to measure performance for different
+ // policies and chunk counts
ShardedPath.AppendAsciiRange(str, str + 3);
- ShardedPath.Append('\\');
+ ShardedPath.Append(std::filesystem::path::preferred_separator);
ShardedPath.AppendAsciiRange(str + 3, str + 5);
- OutShard2len = ShardedPath.Size();
+ Shard2len = ShardedPath.Size();
- ShardedPath.Append('\\');
+ ShardedPath.Append(std::filesystem::path::preferred_separator);
ShardedPath.AppendAsciiRange(str + 5, str + 40);
+}
+
+//////////////////////////////////////////////////////////////////////////
- return ShardedPath;
+FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas"))
+{
+}
+
+FileCasStrategy::~FileCasStrategy()
+{
}
CasStore::InsertResult
@@ -78,11 +83,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
IoBufferFileReference FileRef;
if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef))
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
auto DeletePayloadFileOnClose = [&] {
// This will cause the file to be deleted when the last handle to it is closed
@@ -105,7 +106,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
{
CAtlFile PayloadFile;
- if (HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
+ if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes))
{
// If we succeeded in opening the target file then we don't need to do anything else because it already exists
// and should contain the content we were about to insert
@@ -118,7 +119,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
}
}
- std::filesystem::path FullPath(ShardedPath.c_str());
+ std::filesystem::path FullPath(Name.ShardedPath.c_str());
std::filesystem::path FilePath = FullPath.parent_path();
std::wstring FileName = FullPath.native();
@@ -194,11 +195,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash)
CasStore::InsertResult
FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
// See if file already exists
//
@@ -206,7 +203,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
CAtlFile PayloadFile;
- HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+ HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
if (SUCCEEDED(hRes))
{
@@ -221,7 +218,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
// For now, use double-checked locking to see if someone else was first
- hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
+ hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
if (SUCCEEDED(hRes))
{
@@ -235,7 +232,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes));
}
- auto InternalCreateFile = [&] { return PayloadFile.Create(ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
+ auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); };
hRes = InternalCreateFile();
@@ -243,14 +240,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
{
// Ensure parent directories exist and retry file creation
- std::filesystem::create_directories(std::wstring_view(ShardedPath.c_str(), Shard2len));
+ std::filesystem::create_directories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len));
hRes = InternalCreateFile();
}
if (FAILED(hRes))
{
- ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(ShardedPath)));
+ ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(Name.ShardedPath)));
}
size_t ChunkRemain = ChunkSize;
@@ -276,36 +273,37 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize
IoBuffer
FileCasStrategy::FindChunk(const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
- return IoBufferBuilder::MakeFromFile(ShardedPath.c_str());
+ return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str());
}
bool
FileCasStrategy::HaveChunk(const IoHash& ChunkHash)
{
- size_t Shard2len = 0;
- ExtendableWideStringBuilder<128> ShardedPath;
- ShardedPath.Append(m_Config.RootDirectory.c_str());
- ShardedPath.Append(std::filesystem::path::preferred_separator);
- MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len);
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
RwLock::SharedLockScope _(LockForHash(ChunkHash));
std::error_code Ec;
- if (std::filesystem::exists(ShardedPath.c_str(), Ec))
+ if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec))
{
return true;
}
return false;
}
+void
+FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec)
+{
+ ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash);
+
+ ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath));
+
+ std::filesystem::remove(Name.ShardedPath.c_str(), Ec);
+}
void
FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
@@ -318,20 +316,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks)
// a caller, this is something which needs to be taken into account by anyone consuming
// this functionality in any case
- std::unordered_set<IoHash> HaveSet;
-
- for (const IoHash& Hash : InOutChunks.GetChunkSet())
- {
- if (HaveChunk(Hash))
- {
- HaveSet.insert(Hash);
- }
- }
-
- for (const IoHash& Hash : HaveSet)
- {
- InOutChunks.RemoveIfPresent(Hash);
- }
+ InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); });
}
void
@@ -421,6 +406,27 @@ FileCasStrategy::Scrub(ScrubContext& Ctx)
}
});
+ if (!BadHashes.empty())
+ {
+ ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size());
+
+ if (Ctx.RunRecovery())
+ {
+ ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size());
+
+ for (const IoHash& Hash : BadHashes)
+ {
+ std::error_code Ec;
+ DeleteChunk(Hash, Ec);
+
+ if (Ec)
+ {
+ ZEN_WARN("failed to delete file for chunk {}", Hash);
+ }
+ }
+ }
+ }
+
Ctx.ReportBadChunks(BadHashes);
}
diff --git a/zenstore/filecas.h b/zenstore/filecas.h
index 18102968a..db21502c6 100644
--- a/zenstore/filecas.h
+++ b/zenstore/filecas.h
@@ -12,6 +12,10 @@
#include <functional>
+namespace spdlog {
+class logger;
+}
+
namespace zen {
class BasicFile;
@@ -37,10 +41,20 @@ private:
const CasStoreConfiguration& m_Config;
RwLock m_Lock;
RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines
+ spdlog::logger& m_Log;
+ spdlog::logger& Log() { return m_Log; }
+
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
+ void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
+ void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec);
+
+ struct ShardingHelper
+ {
+ ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash);
- inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; }
- static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len);
- void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback);
+ size_t Shard2len = 0;
+ ExtendableWideStringBuilder<128> ShardedPath;
+ };
};
} // namespace zen
diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h
index bb310b179..93454ca6f 100644
--- a/zenstore/include/zenstore/CAS.h
+++ b/zenstore/include/zenstore/CAS.h
@@ -8,8 +8,11 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/refcount.h>
+#include <zencore/timer.h>
+
#include <atomic>
#include <filesystem>
+#include <functional>
#include <memory>
#include <string>
#include <unordered_set>
@@ -37,21 +40,33 @@ public:
private:
};
+/** Context object for data scrubbing
+ *
+ * Data scrubbing is when we traverse stored data to validate it and
+ * optionally correct/recover
+ */
+
class ScrubContext
{
public:
- virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ virtual void ReportBadChunks(std::span<IoHash> BadChunks);
+ inline uint64_t ScrubTimestamp() const { return m_ScrubTime; }
+ inline bool RunRecovery() const { return m_Recover; }
private:
+ uint64_t m_ScrubTime = GetHifreqTimerValue();
+ bool m_Recover = true;
};
class CasChunkSet
{
public:
- void AddChunk(const IoHash& HashToAdd) { m_ChunkSet.insert(HashToAdd); }
- bool RemoveIfPresent(const IoHash& HashToRemove) { return 0 != m_ChunkSet.erase(HashToRemove); }
- const std::unordered_set<IoHash>& GetChunkSet() const { return m_ChunkSet; }
- bool IsEmpty() const { return m_ChunkSet.empty(); }
+ void AddChunkToSet(const IoHash& HashToAdd);
+ void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate);
+ void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback);
+ inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); }
+ inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); }
+ inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); }
private:
std::unordered_set<IoHash> m_ChunkSet;
@@ -78,6 +93,7 @@ public:
protected:
CasStoreConfiguration m_Config;
+ uint64_t m_LastScrubTime = 0;
};
ZENCORE_API CasStore* CreateCasStore();
diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h
index 49f2bf99a..f4439e083 100644
--- a/zenstore/include/zenstore/cidstore.h
+++ b/zenstore/include/zenstore/cidstore.h
@@ -50,8 +50,8 @@ public:
// TODO: add batch filter support
private:
- struct CidState;
- std::unique_ptr<CidState> m_Impl;
+ struct Impl;
+ std::unique_ptr<Impl> m_Impl;
};
} // namespace zen