diff options
30 files changed, 1011 insertions, 638 deletions
@@ -3,6 +3,9 @@ This is the implementation of the local storage service for UE5. It is intended to be deployed on user machines either as a daemon or launched ad hoc as required during of editor/cooker/game startup +Zen can also be deployed as a shared instance for use as a shared cache. It also supports upstream +connectivity to cloud storage services as well as other Zen server instances. + We currently only support building and running the server on Windows. Linux and Mac support is in progress ## Building on Windows diff --git a/UnrealEngine.ico b/UnrealEngine.ico Binary files differnew file mode 100644 index 000000000..1cfa301a2 --- /dev/null +++ b/UnrealEngine.ico @@ -43,6 +43,7 @@ end if is_os("windows") then add_defines("_CRT_SECURE_NO_WARNINGS", "_UNICODE", "UNICODE", "_WIN32_WINNT=0x0A00") + -- add_ldflags("/MAP") end add_defines("USE_SENTRY=1") diff --git a/zencore/mpscqueue.cpp b/zencore/mpscqueue.cpp index e1841ef63..29c76c3ca 100644 --- a/zencore/mpscqueue.cpp +++ b/zencore/mpscqueue.cpp @@ -7,7 +7,7 @@ namespace zen { -#if ZEN_WITH_TESTS && 0 +#if ZEN_WITH_TESTS && 0 TEST_CASE("mpsc") { MpscQueue<std::string> Queue; diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index 087d4c807..deaf95d5a 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -553,7 +553,7 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) CancelThreadpoolIo(Iocp); - ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: {}", SendResult, HttpReq->pRawUrl); + ZEN_ERROR("failed to send HTTP response (error: '{}'), request URL: '{}'", GetWindowsErrorAsString(SendResult), HttpReq->pRawUrl); ErrorCode = MakeErrorCode(SendResult); } @@ -1256,7 +1256,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT case ERROR_OPERATION_ABORTED: return nullptr; - case ERROR_MORE_DATA: // Insufficient buffer space + case ERROR_MORE_DATA: // Insufficient buffer space case NO_ERROR: break; } diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 7f1fe7b44..3ac1ec37f 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -173,10 +173,19 @@ HttpStructuredCacheService::Flush() { } -void +void HttpStructuredCacheService::Scrub(ScrubContext& Ctx) { - ZEN_UNUSED(Ctx); + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + + m_CasStore.Scrub(Ctx); + m_CidStore.Scrub(Ctx); + m_CacheStore.Scrub(Ctx); } void @@ -244,7 +253,7 @@ HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, } void -HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { @@ -253,439 +262,380 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, case kHead: case kGet: { - const ZenContentType AcceptType = Request.AcceptContentType(); + HandleGetCacheRecord(Request, Ref, Policy); + if (Verb == kHead) + { + Request.SetSuppressResponseBody(); + } + } + break; + case kPut: + HandlePutCacheRecord(Request, Ref, Policy); + break; + default: + break; + } +} - ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - bool InUpstreamCache = false; +void +HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + const ZenContentType AcceptType = Request.AcceptContentType(); - const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + ZenCacheValue Value; + bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); + bool InUpstreamCache = false; - if (QueryUpstream) - { - const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary - : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage - : ZenContentType::kCbObject; + const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); - if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); - UpstreamResult.Success) - { - Value.Value = UpstreamResult.Value; - Success = true; - InUpstreamCache = true; + if (QueryUpstream) + { + const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary + : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage + : ZenContentType::kCbObject; - if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) - { - if (CacheRecordType == ZenContentType::kCbObject) - { - const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - - if (ValidationResult == CbValidateError::None) - { - CbObjectView CacheRecord(UpstreamResult.Value.Data()); - - CbObjectWriter IndexData; - IndexData.BeginArray("references"); - CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); - IndexData.EndArray(); - - Value.IndexData = IndexData.Save(); - } - else - { - Success = false; - ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", - Ref.BucketSegment, - Ref.HashKey); - } - } + if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + UpstreamResult.Success) + { + Value.Value = UpstreamResult.Value; + Success = true; + InUpstreamCache = true; - if (Success) - { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); - } - } - else - { - ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); + if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) + { + if (CacheRecordType == ZenContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); - CbPackage Package; - if (Package.TryLoad(UpstreamResult.Value)) - { - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - CbObject CacheRecord = Package.GetObject(); - - CacheRecord.IterateAttachments( - [this, &Package, &Ref, &AttachmentCount, &FoundCount](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) - { - if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) - { - m_CidStore.AddChunk(Chunk); - FoundCount++; - } - else - { - ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", - Ref.BucketSegment, - Ref.HashKey); - } - } - AttachmentCount++; - }); - - if (FoundCount == AttachmentCount) - { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - - if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) - { - CbPackage PackageWithoutAttachments; - PackageWithoutAttachments.SetObject(CacheRecord); - - MemoryOutStream MemStream; - BinaryWriter Writer(MemStream); - PackageWithoutAttachments.Save(Writer); - - Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - } - } - else - { - Success = false; - ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", - Ref.BucketSegment, - Ref.HashKey); - } - } - else - { - Success = false; - ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); - } - } - } - } + if (ValidationResult == CbValidateError::None) + { + CbObjectView CacheRecord(UpstreamResult.Value.Data()); - if (!Success) - { - ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); + CbObjectWriter IndexData; + IndexData.BeginArray("references"); + CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); + IndexData.EndArray(); - return Request.WriteResponse(HttpResponseCode::NotFound); + Value.IndexData = IndexData.Save(); + } + else + { + Success = false; + ZEN_WARN("Get - cache record '{}/{}' FAILED, invalid compact binary object from upstream", + Ref.BucketSegment, + Ref.HashKey); + } } - if (Verb == kHead) + if (Success) { - Request.SetSuppressResponseBody(); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); } + } + else + { + ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); - if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + CbPackage Package; + if (Package.TryLoad(UpstreamResult.Value)) { - CbObjectView CacheRecord(Value.Value.Data()); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + CbObject CacheRecord = Package.GetObject(); - const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); + CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (CompressedBuffer Chunk = Attachment->AsCompressedBinary()) + { + m_CidStore.AddChunk(Chunk); + ValidCount++; + } + else + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, upstream attachment not compressed", + Ref.BucketSegment, + Ref.HashKey); + } + } + AttachmentCount++; + }); - if (ValidationResult != CbValidateError::None) + if (ValidCount == AttachmentCount) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); - - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); - } - - const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - uint64_t AttachmentBytes = 0ull; + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - CbPackage Package; - - if (!SkipAttachments) - { - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - FoundCount++; - } - AttachmentCount++; - }); - - if (FoundCount != AttachmentCount) + if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - FoundCount, - AttachmentCount); + CbPackage PackageWithoutAttachments; + PackageWithoutAttachments.SetObject(CacheRecord); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + PackageWithoutAttachments.Save(Writer); - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } - - Package.SetObject(LoadCompactBinaryObject(Value.Value)); - - ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(AttachmentBytes + Value.Value.Size()), - AttachmentCount, - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - MemoryOutStream MemStream; - BinaryWriter Writer(MemStream); - Package.Save(Writer); - - IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + else + { + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, attachments missing in upstream package", + Ref.BucketSegment, + Ref.HashKey); + } } else { - ZEN_DEBUG("HIT - '{}/{}' {} ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Value.Value.Size()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + Success = false; + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid upstream package", Ref.BucketSegment, Ref.HashKey); } } - break; + } + } - case kPut: - { - IoBuffer Body = Request.ReadPayload(); + if (!Success) + { + ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); - if (!Body || Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + return Request.WriteResponse(HttpResponseCode::NotFound); + } - const HttpContentType ContentType = Request.RequestContentType(); + if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) + { + CbObjectView CacheRecord(Value.Value.Data()); - const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); - if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) - { - // TODO: create a cache record and put value in CAS? - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + } + + const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + uint64_t AttachmentBytes = 0ull; - if (StoreUpstream) + CbPackage Package; + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) { - auto Result = m_UpstreamCache->EnqueueUpstream( - {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + ValidCount++; } + AttachmentCount++; + }); - return Request.WriteResponse(HttpResponseCode::Created); - } - else if (ContentType == HttpContentType::kCbObject) - { - // Validate payload before accessing it - const CbValidateError ValidationResult = - ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All); + if (ValidCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + ValidCount, + AttachmentCount); - if (ValidationResult != CbValidateError::None) - { - ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", - Ref.BucketSegment, - Ref.HashKey, - Body.Size()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } + } - // TODO: add details in response, kText || kCbObject? - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Compact binary validation failed"sv); - } + Package.SetObject(LoadCompactBinaryObject(Value.Value)); - // Extract referenced payload hashes - CbObjectView Cbo(Body.Data()); + ZEN_DEBUG("HIT - '{}/{}' {}, {} attachments (LOCAL)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(AttachmentBytes + Value.Value.Size()), + AttachmentCount); - std::vector<IoHash> References; - std::vector<IoHash> MissingRefs; - Cbo.IterateAttachments([&](CbFieldView AttachmentView) { References.push_back(AttachmentView.AsHash()); }); + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + Package.Save(Writer); - ZenCacheValue CacheValue; - CacheValue.Value = Body; + IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - if (!References.empty()) - { - CbObjectWriter Idx; - Idx.BeginArray("references"); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + } + else + { + ZEN_DEBUG("HIT - '{}/{}' {} ({})", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Value.Value.Size()), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); - for (const IoHash& Hash : References) - { - Idx.AddHash(Hash); - if (!m_CidStore.ContainsChunk(Hash)) - { - MissingRefs.push_back(Hash); - } - } + Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + } +} - Idx.EndArray(); +void +HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + IoBuffer Body = Request.ReadPayload(); - CacheValue.IndexData = Idx.Save(); - } + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); - ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(CacheValue.Value.Size()), - MissingRefs.size(), - References.size()); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) + { + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (MissingRefs.empty() && StoreUpstream) - { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); + } - return Request.WriteResponse(HttpResponseCode::Created); - } - else - { - // TODO: Binary attachments? - CbObjectWriter Response; - Response.BeginArray("needs"); - for (const IoHash& MissingRef : MissingRefs) - { - Response.AddHash(MissingRef); - ZEN_DEBUG("cache record '{}/{}' is missing reference '{}'", Ref.BucketSegment, Ref.HashKey, MissingRef); - } - Response.EndArray(); + Request.WriteResponse(HttpResponseCode::Created); + } + else if (ContentType == HttpContentType::kCbObject) + { + const CbValidateError ValidationResult = ValidateCompactBinary(MemoryView(Body.GetData(), Body.GetSize()), CbValidateMode::All); - // Return Created | BadRequest? - return Request.WriteResponse(HttpResponseCode::Created, Response.Save()); - } - } - else if (ContentType == HttpContentType::kCbPackage) - { - CbPackage Package; + if (ValidationResult != CbValidateError::None) + { + ZEN_WARN("PUT - cache record '{}/{}' ({} bytes) FAILED, invalid compact binary", Ref.BucketSegment, Ref.HashKey, Body.Size()); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv); + } - if (!Package.TryLoad(Body)) - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); - } + CbObjectView CacheRecord(Body.Data()); + std::vector<IoHash> ValidAttachments; + uint32_t AttachmentCount = 0; - CbObject CacheRecord = Package.GetObject(); + CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) { + const IoHash Hash = AttachmentHash.AsHash(); + if (m_CidStore.ContainsChunk(Hash)) + { + ValidAttachments.emplace_back(Hash); + } + AttachmentCount++; + }); - struct AttachmentInsertResult - { - int32_t Count = 0; - int32_t NewCount = 0; - uint64_t Bytes = 0; - uint64_t NewBytes = 0; - bool Ok = false; - }; + const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size()); + const bool ValidCacheRecord = ValidCount == AttachmentCount; - AttachmentInsertResult AttachmentResult{.Ok = true}; - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - std::vector<IoHash> PayloadIds; + if (ValidCacheRecord) + { + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {} attachments", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size()), ValidCount); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - PayloadIds.reserve(Attachments.size()); + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); + } - CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) - { - if (Attachment->IsCompressedBinary()) - { - CompressedBuffer Chunk = Attachment->AsCompressedBinary(); - const uint64_t ChunkSize = Chunk.GetCompressed().GetSize(); - CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, found {}/{} attachments", + Ref.BucketSegment, + Ref.HashKey, + ValidCount, + AttachmentCount); - PayloadIds.emplace_back(InsertResult.DecompressedId); + Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv); + } + } + else if (ContentType == HttpContentType::kCbPackage) + { + CbPackage Package; - if (InsertResult.New) - { - AttachmentResult.NewBytes += ChunkSize; - AttachmentResult.NewCount++; - } + if (!Package.TryLoad(Body)) + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, invalid package", Ref.BucketSegment, Ref.HashKey); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); + } - AttachmentResult.Bytes += ChunkSize; - AttachmentResult.Count++; - } - else - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", - Ref.BucketSegment, - Ref.HashKey, - AttachmentHash.AsHash()); - AttachmentResult.Ok = false; - } - } - else - { - ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", - Ref.BucketSegment, - Ref.HashKey, - AttachmentHash.AsHash()); - AttachmentResult.Ok = false; - } - }); + CbObject CacheRecord = Package.GetObject(); - if (!AttachmentResult.Ok) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); - } + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + std::vector<IoHash> ValidAttachments; + int32_t NewAttachmentCount = 0; - IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); - const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size(); + ValidAttachments.reserve(Attachments.size()); - ZenCacheValue CacheValue{.Value = CacheRecordChunk}; - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); + CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) { + if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + { + if (Attachment->IsCompressedBinary()) + { + CompressedBuffer Chunk = Attachment->AsCompressedBinary(); + CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk); + + ValidAttachments.emplace_back(InsertResult.DecompressedId); - if (StoreUpstream) + if (InsertResult.New) { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(PayloadIds)}); + NewAttachmentCount++; } - - ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(TotalPackageBytes), - AttachmentResult.NewCount, - AttachmentResult.Count, - NiceBytes(AttachmentResult.NewBytes), - NiceBytes(AttachmentResult.Bytes)); - - return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(HttpResponseCode::BadRequest); + ZEN_WARN("PUT - cache record '{}/{}' FAILED, attachment '{}' is not compressed", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); } } - break; + else + { + ZEN_WARN("PUT - cache record '{}/{}' FAILED, missing attachment '{}'", + Ref.BucketSegment, + Ref.HashKey, + AttachmentHash.AsHash()); + } + }); - case kPost: - break; + const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); - default: - break; + if (!AttachmentsValid) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + } + + ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} new attachments", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.GetSize()), + NewAttachmentCount, + Attachments.size()); + + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + + if (StoreUpstream) + { + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); + } + + Request.WriteResponse(HttpResponseCode::Created); + } + else + { + Request.WriteResponse(HttpResponseCode::BadRequest); } } void -HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - // Note: the URL references the uncompressed payload hash - so this maintains the mapping - // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash - // - // this is a PITA but a consequence of the fact that the client side code is not able to - // address data by compressed hash - - ZEN_UNUSED(Policy); - switch (auto Verb = Request.RequestVerb()) { using enum HttpVerb; @@ -693,112 +643,107 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kHead: case kGet: { - IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; - - if (!Payload && m_UpstreamCache) - { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) - { - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) - { - Payload = UpstreamResult.Value; - IoHash ChunkHash = IoHash::HashBuffer(Payload); - CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; - - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); - } - else - { - ZEN_WARN("got uncompressed upstream cache payload"); - } - } - } - - if (!Payload) - { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Payload.Size()), - Payload.GetContentType(), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - + HandleGetCachePayload(Request, Ref, Policy); if (Verb == kHead) { Request.SetSuppressResponseBody(); } - - return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; - case kPut: + HandlePutCachePayload(Request, Ref, Policy); + break; + default: + break; + } +} + +void +HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; + const bool QueryUpstream = !Payload && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) + { + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); + UpstreamResult.Success) + { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - if (IoBuffer Body = Request.ReadPayload()) - { - if (Body.Size() == 0) - { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); - } + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; - IoHash ChunkHash = IoHash::HashBuffer(Body); + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + } + else + { + ZEN_WARN("got uncompressed upstream cache payload"); + } + } + } + + if (!Payload) + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + return Request.WriteResponse(HttpResponseCode::NotFound); + } - CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); + ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Payload.Size()), + Payload.GetContentType(), + InUpstreamCache ? "UPSTREAM" : "LOCAL"); - if (!Compressed) - { - // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "Attachments must be compressed"); - } - else - { - if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) - { - // the URL specified content id and content hashes don't match! - return Request.WriteResponse(HttpResponseCode::BadRequest); - } + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); +} - CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); +void +HttpStructuredCacheService::HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) +{ + // Note: Individual cache payloads are not propagated upstream until a valid cache record has been stored + ZEN_UNUSED(Policy); - m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + IoBuffer Body = Request.ReadPayload(); - ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", - Ref.BucketSegment, - Ref.HashKey, - Ref.PayloadId, - NiceBytes(Body.Size()), - Body.GetContentType(), - Result.New ? "NEW" : "OLD"); + if (!Body || Body.Size() == 0) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } - if (Result.New) - { - return Request.WriteResponse(HttpResponseCode::Created); - } - else - { - return Request.WriteResponse(HttpResponseCode::OK); - } - } - } - } - break; + IoHash ChunkHash = IoHash::HashBuffer(Body); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); - case kPost: - break; + if (!Compressed) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"sv); + } - default: - break; + if (IoHash::FromBLAKE3(Compressed.GetRawHash()) != Ref.PayloadId) + { + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Payload ID does not match attachment hash"sv); } + + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + + m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); + + ZEN_DEBUG("PUT - payload '{}/{}/{}' {} (type: {}) {}", + Ref.BucketSegment, + Ref.HashKey, + Ref.PayloadId, + NiceBytes(Body.Size()), + Body.GetContentType(), + Result.New ? "NEW" : "OLD"); + + const HttpResponseCode ResponseCode = Result.New ? HttpResponseCode::Created : HttpResponseCode::OK; + + Request.WriteResponse(ResponseCode); } bool diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index bd163dd1d..3fdaa1236 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -71,8 +71,12 @@ private: }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandlePutCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } @@ -81,6 +85,7 @@ private: zen::CasStore& m_CasStore; zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; + uint64_t m_LastScrubTime = 0; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 502ca6605..3d80bb14c 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -108,6 +108,13 @@ ZenCacheStore::Flush() void ZenCacheStore::Scrub(ScrubContext& Ctx) { + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index fdf4a8cfe..2cc3abb53 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -112,6 +112,7 @@ private: ZenCacheMemoryLayer m_MemLayer; ZenCacheDiskLayer m_DiskLayer; uint64_t m_DiskLayerSizeThreshold = 4 * 1024; + uint64_t m_LastScrubTime = 0; }; /** Tracks cache entry access, stats and orchestrates cleanup activities diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 3197eaee4..15d9e0141 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -375,7 +375,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, FunctionSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); - ChunkSet.AddChunk(Hash); + ChunkSet.AddChunkToSet(Hash); }); // Note that we store executables uncompressed to make it @@ -399,16 +399,15 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); - for (const IoHash& Hash : ChunkSet.GetChunkSet()) - { + ChunkSet.IterateChunks([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); - } + }); ResponseWriter.EndArray(); - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetChunkSet().size()); + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 164d2a792..c21638258 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -17,6 +17,8 @@ #include <zencore/logging.h> #include <sol/sol.hpp> +#include <conio.h> + #if ZEN_PLATFORM_WINDOWS // Used for getting My Documents for default data directory @@ -219,7 +221,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z if (result.count("help")) { zen::logging::ConsoleLog().info("{}", options.help()); - + zen::logging::ConsoleLog().info("Press any key to exit!"); + _getch(); exit(0); } @@ -274,14 +277,24 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv try { - sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zencfg"); + sol::load_result config = lua.load(std::string_view((const char*)LuaScript.Data(), LuaScript.Size()), "zen_cfg"); + + if (!config.valid()) + { + sol::error err = config; + + std::string ErrorString = sol::to_string(config.status()); + + throw std::runtime_error("{} error: {}"_format(ErrorString, err.what())); + } + config(); } catch (std::exception& e) { - ZEN_ERROR("config script failure: {}", e.what()); + ZEN_ERROR("config failure: {}", e.what()); - throw std::runtime_error("fatal zen global config script ({}) failure: {}"_format(ConfigScript, e.what()).c_str()); + throw std::runtime_error("failed to run global config script ('{}'): {}"_format(ConfigScript, e.what()).c_str()); } ServiceConfig.MeshEnabled = lua["mesh"]["enable"].get_or(ServiceConfig.MeshEnabled); diff --git a/zenserver/resource.h b/zenserver/resource.h new file mode 100644 index 000000000..f2e3b471b --- /dev/null +++ b/zenserver/resource.h @@ -0,0 +1,18 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +//{{NO_DEPENDENCIES}} +// Microsoft Visual C++ generated include file. +// Used by zenserver.rc +// +#define IDI_ICON1 101 + +// Next default values for new objects +// +#ifdef APSTUDIO_INVOKED +# ifndef APSTUDIO_READONLY_SYMBOLS +# define _APS_NEXT_RESOURCE_VALUE 102 +# define _APS_NEXT_COMMAND_VALUE 40001 +# define _APS_NEXT_CONTROL_VALUE 1001 +# define _APS_NEXT_SYMED_VALUE 101 +# endif +#endif diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 2e74602db..14da8cbcc 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -90,6 +90,11 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -121,6 +126,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -144,6 +154,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -171,6 +186,11 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -204,6 +224,11 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -227,6 +252,11 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 21217387c..94e7e7680 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -42,10 +42,12 @@ private: struct CloudCacheResult { - IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; /** diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index d6b6d44be..a889fb984 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -23,7 +23,6 @@ #include <algorithm> #include <atomic> #include <deque> -#include <limits> #include <thread> #include <unordered_map> @@ -106,11 +105,23 @@ namespace detail { virtual ~JupiterUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override { - CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.Authenticate(); - return Result.Success; + try + { + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -143,6 +154,7 @@ namespace detail { CacheRecord.IterateAttachments([&Session, &Result, &Package](CbFieldView AttachmentHash) { CloudCacheResult AttachmentResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; + Result.ErrorCode = AttachmentResult.ErrorCode; if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(AttachmentResult.Response))) { @@ -169,14 +181,16 @@ namespace detail { } } + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -187,14 +201,16 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -277,6 +293,7 @@ namespace detail { bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -285,27 +302,33 @@ namespace detail { ZenUpstreamEndpoint(std::string_view ServiceUrl) { using namespace fmt::literals; - m_DisplayName = "Zen - '{}'"_format(ServiceUrl); + m_DisplayName = "Zen - {}"_format(ServiceUrl); m_Client = new ZenStructuredCacheClient(ServiceUrl); } ~ZenUpstreamEndpoint() = default; - virtual bool Initialize() override + virtual bool IsHealthy() const override { return m_HealthOk; } + + virtual UpstreamEndpointHealth CheckHealth() override { try { ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; + for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) { Result = Session.SayHello(); } - return Result.Success; + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = m_HealthOk}; } - catch (std::exception&) + catch (std::exception& Err) { - return false; + return {.Reason = Err.what(), .Ok = false}; } } @@ -318,14 +341,16 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -337,14 +362,16 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + m_HealthOk = Result.ErrorCode == 0; + return {.Value = Result.Response, .Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { - return {.Reason = std::string(e.what()), .Success = false}; + return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; } } @@ -390,6 +417,8 @@ namespace detail { CacheRecord.CacheKey.Hash, PackagePayload, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes = Result.Bytes; @@ -406,6 +435,8 @@ namespace detail { CacheRecord.CacheKey.Hash, CacheRecord.PayloadIds[Idx], Payloads[Idx]); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -425,6 +456,8 @@ namespace detail { { Result = Session.PutCacheRecord(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, CacheRecord.Type); + + m_HealthOk = Result.ErrorCode == 0; } TotalBytes += Result.Bytes; @@ -435,6 +468,7 @@ namespace detail { } catch (std::exception& e) { + m_HealthOk = false; return {.Reason = std::string(e.what()), .Success = false}; } } @@ -442,6 +476,7 @@ namespace detail { private: std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; + std::atomic_bool m_HealthOk{false}; }; } // namespace detail @@ -548,27 +583,35 @@ public: virtual bool Initialize() override { - auto NewEnd = std::remove_if(std::begin(m_Endpoints), std::end(m_Endpoints), [this](auto& Endpoint) { - const bool Ok = Endpoint->Initialize(); - ZEN_INFO("{} [{}]", Endpoint->DisplayName(), Ok ? "OK" : "FAILED"); - return !Ok; - }); + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); + if (Health.Ok) + { + ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } - m_Endpoints.erase(NewEnd, std::end(m_Endpoints)); - m_IsRunning = !m_Endpoints.empty(); + m_RunState.IsRunning = !m_Endpoints.empty(); - if (m_IsRunning) + if (m_RunState.IsRunning) { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { m_UpstreamThreads.emplace_back(&DefaultUpstreamCache::ProcessUpstreamQueue, this); } + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamCache::MonitorEndpoints, this); } - return m_IsRunning; + return m_RunState.IsRunning; } - virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { @@ -576,10 +619,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -593,10 +639,13 @@ public: { for (auto& Endpoint : m_Endpoints) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } } @@ -606,7 +655,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load() && m_Options.WriteUpstream) + if (m_RunState.IsRunning && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -655,18 +704,21 @@ private: for (auto& Endpoint : m_Endpoints) { - const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); - if (Result.Success) + if (Endpoint->IsHealthy()) { - m_Stats.Add(*Endpoint, Result); - } - else - { - ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - Endpoint->DisplayName(), - Result.Reason); + const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); + if (Result.Success) + { + m_Stats.Add(*Endpoint, Result); + } + else + { + ZEN_WARN("process upstream FAILED, '{}/{}' FAILED, endpoint '{}', reason: '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->DisplayName(), + Result.Reason); + } } } } @@ -688,33 +740,82 @@ private: } } - if (!m_IsRunning.load()) + if (!m_RunState.IsRunning) { break; } } } + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock lk(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(lk, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + void Shutdown() { - if (m_IsRunning.load()) + if (m_RunState.Stop()) { - m_IsRunning.store(false); m_UpstreamQueue.CompleteAdding(); - for (std::thread& Thread : m_UpstreamThreads) { Thread.join(); } + m_EndpointMonitorThread.join(); m_UpstreamThreads.clear(); m_Endpoints.clear(); } } + spdlog::logger& Log() { return m_Log; } + using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; - spdlog::logger& Log() { return m_Log; } + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::lock_guard _(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; spdlog::logger& m_Log; UpstreamCacheOptions m_Options; @@ -724,7 +825,8 @@ private: UpstreamStats m_Stats; std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + std::thread m_EndpointMonitorThread; + RunState m_RunState; }; ////////////////////////////////////////////////////////////////////////// diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 142fe260f..96ee8bddc 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -6,6 +6,7 @@ #include <zencore/iohash.h> #include <zencore/zencore.h> +#include <chrono> #include <memory> namespace zen { @@ -35,18 +36,33 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; - bool ReadUpstream = true; - bool WriteUpstream = true; + std::chrono::seconds HealthCheckInterval{5}; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; +}; + +enum class UpstreamStatusCode : uint8_t +{ + Ok, + Error +}; + +struct UpstreamError +{ + UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok; + std::string Reason; + + explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; } }; struct GetUpstreamCacheResult { - IoBuffer Value; - std::string Reason; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Value; + UpstreamError Error; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + bool Success = false; }; struct PutUpstreamCacheResult @@ -57,6 +73,12 @@ struct PutUpstreamCacheResult bool Success = false; }; +struct UpstreamEndpointHealth +{ + std::string Reason; + bool Ok = false; +}; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -65,7 +87,9 @@ class UpstreamEndpoint public: virtual ~UpstreamEndpoint() = default; - virtual bool Initialize() = 0; + virtual bool IsHealthy() const = 0; + + virtual UpstreamEndpointHealth CheckHealth() = 0; virtual std::string_view DisplayName() const = 0; @@ -88,7 +112,7 @@ public: virtual bool Initialize() = 0; - virtual void AddEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; + virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 7f689d7f3..710d381c6 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -391,6 +391,11 @@ ZenStructuredCacheSession::SayHello() Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; } @@ -411,6 +416,11 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -431,6 +441,11 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -455,6 +470,11 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; @@ -475,6 +495,11 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 36cfd1217..48886096d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -91,10 +91,12 @@ namespace detail { struct ZenCacheResult { - IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + IoBuffer Response; + int64_t Bytes = {}; + double ElapsedSeconds = {}; + int32_t ErrorCode = {}; + std::string Reason; + bool Success = false; }; /** Zen Structured Cache session diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index cf24dc224..e3b61568f 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -193,7 +193,7 @@ public: if (!UpstreamConfig.ZenConfig.Url.empty()) { std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url); - UpstreamCache->AddEndpoint(std::move(ZenEndpoint)); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } { @@ -221,7 +221,7 @@ public: if (!Options.ServiceUrl.empty()) { std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->AddEndpoint(std::move(JupiterEndpoint)); + UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); } } diff --git a/zenserver/zenserver.rc b/zenserver/zenserver.rc new file mode 100644 index 000000000..c063436ef --- /dev/null +++ b/zenserver/zenserver.rc @@ -0,0 +1,81 @@ +// Microsoft Visual C++ generated resource script. +// +#include "resource.h" + +#define APSTUDIO_READONLY_SYMBOLS +///////////////////////////////////////////////////////////////////////////// +// +// Generated from the TEXTINCLUDE 2 resource. +// +#include "winres.h" + +///////////////////////////////////////////////////////////////////////////// +#undef APSTUDIO_READONLY_SYMBOLS + +///////////////////////////////////////////////////////////////////////////// +// English (United States) resources + +#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENU) +LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_US +#pragma code_page(1252) + +///////////////////////////////////////////////////////////////////////////// +// +// Icon +// + +// Icon with lowest ID value placed first to ensure application icon +// remains consistent on all systems. +IDI_ICON1 ICON "..\\UnrealEngine.ico" + +#endif // English (United States) resources +///////////////////////////////////////////////////////////////////////////// + + +///////////////////////////////////////////////////////////////////////////// +// English (United Kingdom) resources + +#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_ENG) +LANGUAGE LANG_ENGLISH, SUBLANG_ENGLISH_UK +#pragma code_page(1252) + +#ifdef APSTUDIO_INVOKED +///////////////////////////////////////////////////////////////////////////// +// +// TEXTINCLUDE +// + +1 TEXTINCLUDE +BEGIN + "resource.h\0" +END + +2 TEXTINCLUDE +BEGIN + "#include ""winres.h""\r\n" + "\0" +END + +3 TEXTINCLUDE +BEGIN + "\r\n" + "\0" +END + +#endif // APSTUDIO_INVOKED + +#endif // English (United Kingdom) resources +///////////////////////////////////////////////////////////////////////////// + + + +#ifndef APSTUDIO_INVOKED +///////////////////////////////////////////////////////////////////////////// +// +// Generated from the TEXTINCLUDE 3 resource. +// + + +///////////////////////////////////////////////////////////////////////////// +#endif // not APSTUDIO_INVOKED + diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index db657d192..1671d98a6 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -109,6 +109,7 @@ <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> <ClInclude Include="diag\logging.h" /> + <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> <ClInclude Include="upstream\jupiter.h" /> @@ -163,6 +164,12 @@ <ItemGroup> <None Include="xmake.lua" /> </ItemGroup> + <ItemGroup> + <ResourceCompile Include="zenserver.rc" /> + </ItemGroup> + <ItemGroup> + <Image Include="..\UnrealEngine.ico" /> + </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 250c55812..c51a8eb76 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -40,6 +40,7 @@ </ClInclude> <ClInclude Include="testing\httptest.h" /> <ClInclude Include="windows\service.h" /> + <ClInclude Include="resource.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -96,4 +97,10 @@ <ItemGroup> <None Include="xmake.lua" /> </ItemGroup> + <ItemGroup> + <ResourceCompile Include="zenserver.rc" /> + </ItemGroup> + <ItemGroup> + <Image Include="..\UnrealEngine.ico" /> + </ItemGroup> </Project>
\ No newline at end of file diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 916e7f709..eaf72cb41 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -26,6 +26,39 @@ namespace zen { void +CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) +{ + m_ChunkSet.insert(HashToAdd); +} + +void +CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + if (Predicate(*It)) + { + It = m_ChunkSet.erase(It); + } + else + { + ++It; + } + } +} + +void +CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback) +{ + for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) + { + Callback(*It); + } +} + +////////////////////////////////////////////////////////////////////////// + +void ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) { ZEN_UNUSED(BadChunks); @@ -111,9 +144,6 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) m_TinyStrategy.Initialize("tobs", 16, IsNewStore); m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); - - ScrubContext Ctx; - Scrub(Ctx); } CasStore::InsertResult @@ -176,6 +206,13 @@ CasImpl::Flush() void CasImpl::Scrub(ScrubContext& Ctx) { + if (m_LastScrubTime == Ctx.ScrubTimestamp()) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + m_SmallStrategy.Scrub(Ctx); m_TinyStrategy.Scrub(Ctx); m_LargeStrategy.Scrub(Ctx); @@ -222,11 +259,11 @@ TEST_CASE("CasStore") CHECK(Result2.New); CasChunkSet ChunkSet; - ChunkSet.AddChunk(Hash1); - ChunkSet.AddChunk(Hash2); + ChunkSet.AddChunkToSet(Hash1); + ChunkSet.AddChunkToSet(Hash2); Store->FilterChunks(ChunkSet); - CHECK(ChunkSet.GetChunkSet().size() == 0); + CHECK(ChunkSet.IsEmpty()); IoBuffer Lookup1 = Store->FindChunk(Hash1); CHECK(Lookup1); diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 5e266f9d3..08a3192ff 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -12,9 +12,9 @@ namespace zen { -struct CidStore::CidState +struct CidStore::Impl { - CidState(CasStore& InCasStore) : m_CasStore(InCasStore) {} + Impl(CasStore& InCasStore) : m_CasStore(InCasStore) {} struct IndexEntry { @@ -42,18 +42,26 @@ struct CidStore::CidState void AddCompressedCid(const IoHash& DecompressedId, const IoHash& Compressed) { + ZEN_ASSERT(Compressed != IoHash::Zero); + RwLock::ExclusiveLockScope _(m_Lock); m_CidMap.insert_or_assign(DecompressedId, Compressed); // TODO: it's pretty wasteful to log even idempotent updates // however we can't simply use the boolean returned by insert_or_assign // since there's not a 1:1 mapping between compressed and uncompressed // so if we want a last-write-wins policy then we have to log each update + LogMapping(DecompressedId, Compressed); + } + + void LogMapping(const IoHash& DecompressedId, const IoHash& Compressed) + { m_LogFile.Append({.Uncompressed = DecompressedId, .Compressed = Compressed}); } IoBuffer FindChunkByCid(const IoHash& DecompressedId) { IoHash CompressedHash; + { RwLock::SharedLockScope _(m_Lock); if (auto It = m_CidMap.find(DecompressedId); It != m_CidMap.end()) @@ -62,12 +70,9 @@ struct CidStore::CidState } } - if (CompressedHash != IoHash::Zero) - { - return m_CasStore.FindChunk(CompressedHash); - } + ZEN_ASSERT(CompressedHash != IoHash::Zero); - return IoBuffer(); + return m_CasStore.FindChunk(CompressedHash); } bool ContainsChunk(const IoHash& DecompressedId) @@ -75,7 +80,17 @@ struct CidStore::CidState RwLock::SharedLockScope _(m_Lock); // Note that we do not check CAS here. This is optimistic but usually // what we want. - return m_CidMap.find(DecompressedId) != m_CidMap.end(); + auto It = m_CidMap.find(DecompressedId); + + if (It == m_CidMap.end()) + { + // Not in map, or tombstone + return false; + } + + ZEN_ASSERT(It->second != IoHash::Zero); + + return true; } void InitializeIndex(const std::filesystem::path& RootDir) @@ -87,6 +102,8 @@ struct CidStore::CidState m_LogFile.Open(SlogPath, IsNew); + uint64_t TombstoneCount = 0; + m_LogFile.Replay([&](const IndexEntry& Ie) { if (Ie.Compressed != IoHash::Zero) { @@ -97,16 +114,24 @@ struct CidStore::CidState { // Tombstone m_CidMap.erase(Ie.Uncompressed); + ++TombstoneCount; } }); - ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); + ZEN_INFO("CID index initialized: {} entries found ({} tombstones)", m_CidMap.size(), TombstoneCount); } void Flush() { m_LogFile.Flush(); } void Scrub(ScrubContext& Ctx) { + if (Ctx.ScrubTimestamp() == m_LastScrubTime) + { + return; + } + + m_LastScrubTime = Ctx.ScrubTimestamp(); + CasChunkSet ChunkSet; { @@ -114,7 +139,7 @@ struct CidStore::CidState for (auto& Kv : m_CidMap) { - ChunkSet.AddChunk(Kv.second); + ChunkSet.AddChunkToSet(Kv.second); } } @@ -126,20 +151,22 @@ struct CidStore::CidState return; } - ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size()); + ZEN_ERROR("Scrubbing found that {} cid mappings (out of {}) mapped to non-existent CAS chunks. These mappings will be removed", + ChunkSet.GetSize(), + m_CidMap.size()); // Erase all mappings to chunks which are not present in the underlying CAS store // we do this by removing mappings from the in-memory lookup structure and also // by emitting tombstone records to the commit log - const auto& MissingChunks = ChunkSet.GetChunkSet(); std::vector<IoHash> BadChunks; + { RwLock::SharedLockScope _(m_Lock); for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) { - if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end()) + if (ChunkSet.ContainsChunk(It->second)) { const IoHash& BadHash = It->first; @@ -163,11 +190,13 @@ struct CidStore::CidState Ctx.ReportBadChunks(BadChunks); } + + uint64_t m_LastScrubTime = 0; }; ////////////////////////////////////////////////////////////////////////// -CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<CidState>(InCasStore)) +CidStore::CidStore(CasStore& InCasStore, const std::filesystem::path& RootDir) : m_Impl(std::make_unique<Impl>(InCasStore)) { m_Impl->InitializeIndex(RootDir); } diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index fe38f0fde..5fc3ac356 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -5,6 +5,7 @@ #include "CompactCas.h" #include <zencore/except.h> +#include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> #include <zencore/thread.h> @@ -32,7 +33,9 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 ZEN_ASSERT(IsPow2(Alignment)); ZEN_ASSERT(!m_IsInitialized); - m_PayloadAlignment = Alignment; + m_ContainerBaseName = ContainerBaseName; + m_PayloadAlignment = Alignment; + std::string BaseName(ContainerBaseName); std::filesystem::path SobsPath = m_Config.RootDirectory / (BaseName + ".ucas"); std::filesystem::path SidxPath = m_Config.RootDirectory / (BaseName + ".uidx"); @@ -144,20 +147,7 @@ CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) // we're likely to already have a large proportion of the // chunks in the set - std::unordered_set<IoHash> HaveSet; - - for (const IoHash& Hash : InOutChunks.GetChunkSet()) - { - if (HaveChunk(Hash)) - { - HaveSet.insert(Hash); - } - } - - for (const IoHash& Hash : HaveSet) - { - InOutChunks.RemoveIfPresent(Hash); - } + InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -210,7 +200,8 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) continue; } - const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size); + const IoHash ComputedHash = + IoHash::HashBuffer(reinterpret_cast<uint8_t*>(BufferBase) + Entry.second.Offset - WindowStart, Entry.second.Size); if (Entry.first != ComputedHash) { @@ -242,6 +233,13 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) } } + if (BadChunks.empty()) + { + return; + } + + ZEN_ERROR("Scrubbing found {} bad chunks in '{}'", BadChunks.size(), m_ContainerBaseName); + // Deal with bad chunks by removing them from our lookup map std::vector<IoHash> BadChunkHashes; diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 101e6b1b7..a512c3d93 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -69,6 +69,7 @@ private: BasicFile m_SmallObjectFile; BasicFile m_SmallObjectIndex; TCasLogFile<CasDiskIndexEntry> m_CasLog; + std::string m_ContainerBaseName; RwLock m_LocationMapLock; std::unordered_map<IoHash, CasDiskLocation, IoHash::Hasher> m_LocationMap; diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 968c9f3a0..c036efd35 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -31,18 +31,12 @@ namespace zen { using namespace fmt::literals; -FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +FileCasStrategy::ShardingHelper::ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash) { -} - -FileCasStrategy::~FileCasStrategy() -{ -} + ShardedPath.Append(RootPath.c_str()); + ShardedPath.Append(std::filesystem::path::preferred_separator); -WideStringBuilderBase& -FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len) -{ - ExtendableStringBuilder<96> HashString; + ExtendableStringBuilder<64> HashString; ChunkHash.ToHexString(HashString); const char* str = HashString.c_str(); @@ -53,20 +47,31 @@ FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHas // This results in a maximum of 4096 * 256 directories // // The numbers have been chosen somewhat arbitrarily but are large to scale - // to very large chunk repositories. It may or may not make sense to make - // this a configurable policy, and it would probably be a good idea to - // measure performance for different policies and chunk counts + // to very large chunk repositories without creating too many directories + // on a single level since NTFS does not deal very well with this. + // + // It may or may not make sense to make this a configurable policy, and it + // would probably be a good idea to measure performance for different + // policies and chunk counts ShardedPath.AppendAsciiRange(str, str + 3); - ShardedPath.Append('\\'); + ShardedPath.Append(std::filesystem::path::preferred_separator); ShardedPath.AppendAsciiRange(str + 3, str + 5); - OutShard2len = ShardedPath.Size(); + Shard2len = ShardedPath.Size(); - ShardedPath.Append('\\'); + ShardedPath.Append(std::filesystem::path::preferred_separator); ShardedPath.AppendAsciiRange(str + 5, str + 40); +} + +////////////////////////////////////////////////////////////////////////// - return ShardedPath; +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config), m_Log(logging::Get("filecas")) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ } CasStore::InsertResult @@ -78,11 +83,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) IoBufferFileReference FileRef; if (Chunk.IsWholeFile() && Chunk.GetFileReference(/* out */ FileRef)) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); auto DeletePayloadFileOnClose = [&] { // This will cause the file to be deleted when the last handle to it is closed @@ -105,7 +106,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) { CAtlFile PayloadFile; - if (HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) + if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) { // If we succeeded in opening the target file then we don't need to do anything else because it already exists // and should contain the content we were about to insert @@ -118,7 +119,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) } } - std::filesystem::path FullPath(ShardedPath.c_str()); + std::filesystem::path FullPath(Name.ShardedPath.c_str()); std::filesystem::path FilePath = FullPath.parent_path(); std::wstring FileName = FullPath.native(); @@ -194,11 +195,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) CasStore::InsertResult FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize, const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); // See if file already exists // @@ -206,7 +203,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize CAtlFile PayloadFile; - HRESULT hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -221,7 +218,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // For now, use double-checked locking to see if someone else was first - hRes = PayloadFile.Create(ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); + hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); if (SUCCEEDED(hRes)) { @@ -235,7 +232,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize ZEN_WARN("Unexpected error code when opening shard file for read: {:#x}", uint32_t(hRes)); } - auto InternalCreateFile = [&] { return PayloadFile.Create(ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; + auto InternalCreateFile = [&] { return PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_WRITE, FILE_SHARE_DELETE, CREATE_ALWAYS); }; hRes = InternalCreateFile(); @@ -243,14 +240,14 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize { // Ensure parent directories exist and retry file creation - std::filesystem::create_directories(std::wstring_view(ShardedPath.c_str(), Shard2len)); + std::filesystem::create_directories(std::wstring_view(Name.ShardedPath.c_str(), Name.Shard2len)); hRes = InternalCreateFile(); } if (FAILED(hRes)) { - ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(ShardedPath))); + ThrowSystemException(hRes, "Failed to open shard file '{}'"_format(WideToUtf8(Name.ShardedPath))); } size_t ChunkRemain = ChunkSize; @@ -276,36 +273,37 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize IoBuffer FileCasStrategy::FindChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); - return IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); + return IoBufferBuilder::MakeFromFile(Name.ShardedPath.c_str()); } bool FileCasStrategy::HaveChunk(const IoHash& ChunkHash) { - size_t Shard2len = 0; - ExtendableWideStringBuilder<128> ShardedPath; - ShardedPath.Append(m_Config.RootDirectory.c_str()); - ShardedPath.Append(std::filesystem::path::preferred_separator); - MakeShardedPath(ShardedPath, ChunkHash, /* out */ Shard2len); + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); RwLock::SharedLockScope _(LockForHash(ChunkHash)); std::error_code Ec; - if (std::filesystem::exists(ShardedPath.c_str(), Ec)) + if (std::filesystem::exists(Name.ShardedPath.c_str(), Ec)) { return true; } return false; } +void +FileCasStrategy::DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec) +{ + ShardingHelper Name(m_Config.RootDirectory.c_str(), ChunkHash); + + ZEN_DEBUG("deleting CAS payload file '{}'", WideToUtf8(Name.ShardedPath)); + + std::filesystem::remove(Name.ShardedPath.c_str(), Ec); +} void FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) @@ -318,20 +316,7 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) // a caller, this is something which needs to be taken into account by anyone consuming // this functionality in any case - std::unordered_set<IoHash> HaveSet; - - for (const IoHash& Hash : InOutChunks.GetChunkSet()) - { - if (HaveChunk(Hash)) - { - HaveSet.insert(Hash); - } - } - - for (const IoHash& Hash : HaveSet) - { - InOutChunks.RemoveIfPresent(Hash); - } + InOutChunks.RemoveChunksIf([&](const IoHash& Hash) { return HaveChunk(Hash); }); } void @@ -421,6 +406,27 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } }); + if (!BadHashes.empty()) + { + ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); + + if (Ctx.RunRecovery()) + { + ZEN_WARN("recovery: deleting backing files for {} bad chunks which were identified as bad", BadHashes.size()); + + for (const IoHash& Hash : BadHashes) + { + std::error_code Ec; + DeleteChunk(Hash, Ec); + + if (Ec) + { + ZEN_WARN("failed to delete file for chunk {}", Hash); + } + } + } + } + Ctx.ReportBadChunks(BadHashes); } diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 18102968a..db21502c6 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -12,6 +12,10 @@ #include <functional> +namespace spdlog { +class logger; +} + namespace zen { class BasicFile; @@ -37,10 +41,20 @@ private: const CasStoreConfiguration& m_Config; RwLock m_Lock; RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines + spdlog::logger& m_Log; + spdlog::logger& Log() { return m_Log; } + + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } + void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + void DeleteChunk(const IoHash& ChunkHash, std::error_code& Ec); + + struct ShardingHelper + { + ShardingHelper(const std::filesystem::path& RootPath, const IoHash& ChunkHash); - inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } - static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len); - void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); + size_t Shard2len = 0; + ExtendableWideStringBuilder<128> ShardedPath; + }; }; } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index bb310b179..93454ca6f 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -8,8 +8,11 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/refcount.h> +#include <zencore/timer.h> + #include <atomic> #include <filesystem> +#include <functional> #include <memory> #include <string> #include <unordered_set> @@ -37,21 +40,33 @@ public: private: }; +/** Context object for data scrubbing + * + * Data scrubbing is when we traverse stored data to validate it and + * optionally correct/recover + */ + class ScrubContext { public: - virtual void ReportBadChunks(std::span<IoHash> BadChunks); + virtual void ReportBadChunks(std::span<IoHash> BadChunks); + inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } + inline bool RunRecovery() const { return m_Recover; } private: + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; }; class CasChunkSet { public: - void AddChunk(const IoHash& HashToAdd) { m_ChunkSet.insert(HashToAdd); } - bool RemoveIfPresent(const IoHash& HashToRemove) { return 0 != m_ChunkSet.erase(HashToRemove); } - const std::unordered_set<IoHash>& GetChunkSet() const { return m_ChunkSet; } - bool IsEmpty() const { return m_ChunkSet.empty(); } + void AddChunkToSet(const IoHash& HashToAdd); + void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); + inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } + inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } + inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } private: std::unordered_set<IoHash> m_ChunkSet; @@ -78,6 +93,7 @@ public: protected: CasStoreConfiguration m_Config; + uint64_t m_LastScrubTime = 0; }; ZENCORE_API CasStore* CreateCasStore(); diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 49f2bf99a..f4439e083 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -50,8 +50,8 @@ public: // TODO: add batch filter support private: - struct CidState; - std::unique_ptr<CidState> m_Impl; + struct Impl; + std::unique_ptr<Impl> m_Impl; }; } // namespace zen |