diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenserver | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenserver')
47 files changed, 2048 insertions, 1252 deletions
diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 2888f5450..73166e608 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -20,6 +20,7 @@ #include <zenstore/cidstore.h> #include <zenstore/gc.h> +#include <zenstore/buildstore/buildstore.h> #include <zenstore/cache/structuredcachestore.h> #include <zenutil/workerpools.h> #include "config.h" @@ -39,7 +40,7 @@ struct DirStats DirStats GetStatsForDirectory(std::filesystem::path Dir) { - if (!std::filesystem::exists(Dir)) + if (!IsDir(Dir)) return {}; struct StatsTraversal : public GetDirectoryContentVisitor @@ -105,6 +106,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, ZenCacheStore* CacheStore, CidStore* CidStore, ProjectStore* ProjectStore, + BuildStore* BuildStore, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions) : m_GcScheduler(Scheduler) @@ -112,6 +114,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, , m_CacheStore(CacheStore) , m_CidStore(CidStore) , m_ProjectStore(ProjectStore) +, m_BuildStore(BuildStore) , m_LogPaths(LogPaths) , m_ServerOptions(ServerOptions) { @@ -306,6 +309,7 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, Response << "Interval" << ToTimeSpan(State.Config.Interval); Response << "MaxCacheDuration" << ToTimeSpan(State.Config.MaxCacheDuration); Response << "MaxProjectStoreDuration" << ToTimeSpan(State.Config.MaxProjectStoreDuration); + Response << "MaxBuildStoreDuration" << ToTimeSpan(State.Config.MaxBuildStoreDuration); Response << "CollectSmallObjects" << State.Config.CollectSmallObjects; Response << "Enabled" << State.Config.Enabled; Response << "DiskReserveSize" << NiceBytes(State.Config.DiskReserveSize); @@ -401,6 +405,14 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, } } + if (auto Param = Params.GetValue("maxbuildstoreduration"); Param.empty() == false) + { + if (auto Value = ParseInt<uint64_t>(Param)) + { + GcParams.MaxBuildStoreDuration = std::chrono::seconds(Value.value()); + } + } + if (auto Param = Params.GetValue("disksizesoftlimit"); Param.empty() == false) { if (auto Value = ParseInt<uint64_t>(Param)) @@ -782,6 +794,10 @@ HttpAdminService::HttpAdminService(GcScheduler& Scheduler, { m_ProjectStore->Flush(); } + if (m_BuildStore) + { + m_BuildStore->Flush(); + } HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); diff --git a/src/zenserver/admin/admin.h b/src/zenserver/admin/admin.h index 563c4f536..e7821dead 100644 --- a/src/zenserver/admin/admin.h +++ b/src/zenserver/admin/admin.h @@ -12,6 +12,7 @@ class JobQueue; class ZenCacheStore; class CidStore; class ProjectStore; +class BuildStore; struct ZenServerOptions; class HttpAdminService : public zen::HttpService @@ -28,6 +29,7 @@ public: ZenCacheStore* CacheStore, CidStore* CidStore, ProjectStore* ProjectStore, + BuildStore* BuildStore, const LogPaths& LogPaths, const ZenServerOptions& ServerOptions); ~HttpAdminService(); @@ -42,6 +44,7 @@ private: ZenCacheStore* m_CacheStore; CidStore* m_CidStore; ProjectStore* m_ProjectStore; + BuildStore* m_BuildStore; LogPaths m_LogPaths; const ZenServerOptions& m_ServerOptions; }; diff --git a/src/zenserver/buildstore/httpbuildstore.cpp b/src/zenserver/buildstore/httpbuildstore.cpp new file mode 100644 index 000000000..bcec74ce6 --- /dev/null +++ b/src/zenserver/buildstore/httpbuildstore.cpp @@ -0,0 +1,573 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpbuildstore.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/trace.h> +#include <zenhttp/packageformat.h> +#include <zenstore/buildstore/buildstore.h> +#include <zenutil/workerpools.h> + +#include <numeric> + +namespace zen { +using namespace std::literals; + +ZEN_DEFINE_LOG_CATEGORY_STATIC(LogBuilds, "builds"sv); + +HttpBuildStoreService::HttpBuildStoreService(HttpStatusService& StatusService, HttpStatsService& StatsService, BuildStore& Store) +: m_Log(logging::Get("builds")) +, m_StatusService(StatusService) +, m_StatsService(StatsService) +, m_BuildStore(Store) +{ + Initialize(); + + m_StatusService.RegisterHandler("builds", *this); + m_StatsService.RegisterHandler("builds", *this); +} + +HttpBuildStoreService::~HttpBuildStoreService() +{ + m_StatsService.UnregisterHandler("builds", *this); + m_StatusService.UnregisterHandler("builds", *this); +} + +const char* +HttpBuildStoreService::BaseUri() const +{ + return "/builds/"; +} + +void +HttpBuildStoreService::Initialize() +{ + ZEN_LOG_INFO(LogBuilds, "Initializing Builds Service"); + + m_Router.AddPattern("namespace", "([[:alnum:]-_.]+)"); + m_Router.AddPattern("bucket", "([[:alnum:]-_.]+)"); + m_Router.AddPattern("buildid", "([[:xdigit:]]{24})"); + m_Router.AddPattern("hash", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/{hash}", + [this](HttpRouterRequest& Req) { PutBlobRequest(Req); }, + HttpVerb::kPut); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/{hash}", + [this](HttpRouterRequest& Req) { GetBlobRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata", + [this](HttpRouterRequest& Req) { PutMetadataRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata", + [this](HttpRouterRequest& Req) { GetMetadatasRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{namespace}/{bucket}/{buildid}/blobs/exists", + [this](HttpRouterRequest& Req) { BlobsExistsRequest(Req); }, + HttpVerb::kPost); +} + +void +HttpBuildStoreService::HandleRequest(zen::HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::HandleRequest"); + metrics::OperationTiming::Scope $(m_HttpRequests); + + m_BuildStoreStats.RequestCount++; + if (m_Router.HandleRequest(Request) == false) + { + ZEN_LOG_WARN(LogBuilds, "No route found for {0}", Request.RelativeUri()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); + } +} + +void +HttpBuildStoreService::PutBlobRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::PutBlobRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const std::string_view Namespace = Req.GetCapture(1); + const std::string_view Bucket = Req.GetCapture(2); + const std::string_view BuildId = Req.GetCapture(3); + const std::string_view Hash = Req.GetCapture(4); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoHash BlobHash; + if (!IoHash::TryParse(Hash, BlobHash)) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid blob hash '{}'", Hash)); + } + m_BuildStoreStats.BlobWriteCount++; + IoBuffer Payload = ServerRequest.ReadPayload(); + if (!Payload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Payload blob {} is empty", Hash)); + } + if (Payload.GetContentType() != HttpContentType::kCompressedBinary) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Payload blob {} content type {} is invalid", Hash, ToString(Payload.GetContentType()))); + } + m_BuildStore.PutBlob(BlobHash, ServerRequest.ReadPayload()); + // ZEN_INFO("Stored blob {}. Size: {}", BlobHash, ServerRequest.ReadPayload().GetSize()); + return ServerRequest.WriteResponse(HttpResponseCode::OK); +} + +void +HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::GetBlobRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + std::string_view Hash = Req.GetCapture(4); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoHash BlobHash; + if (!IoHash::TryParse(Hash, BlobHash)) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid blob hash '{}'", Hash)); + } + zen::HttpRanges Ranges; + bool HasRange = ServerRequest.TryGetRanges(Ranges); + if (Ranges.size() > 1) + { + // Only a single range is supported + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Multiple ranges in blob request is not supported"); + } + + m_BuildStoreStats.BlobReadCount++; + IoBuffer Blob = m_BuildStore.GetBlob(BlobHash); + if (!Blob) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Blob with hash '{}' could not be found", Hash)); + } + // ZEN_INFO("Fetched blob {}. Size: {}", BlobHash, Blob.GetSize()); + m_BuildStoreStats.BlobHitCount++; + if (HasRange) + { + const HttpRange& Range = Ranges.front(); + const uint64_t BlobSize = Blob.GetSize(); + const uint64_t MaxBlobSize = Range.Start < BlobSize ? Range.Start - BlobSize : 0; + const uint64_t RangeSize = Min(Range.End - Range.Start + 1, MaxBlobSize); + if (Range.Start + RangeSize > BlobSize) + { + return ServerRequest.WriteResponse(HttpResponseCode::NoContent); + } + Blob = IoBuffer(Blob, Range.Start, RangeSize); + return ServerRequest.WriteResponse(HttpResponseCode::OK, ZenContentType::kBinary, Blob); + } + else + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, Blob.GetContentType(), Blob); + } +} + +void +HttpBuildStoreService::PutMetadataRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::PutMetadataRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + + IoBuffer MetaPayload = ServerRequest.ReadPayload(); + if (MetaPayload.GetContentType() != ZenContentType::kCbPackage) + { + throw std::runtime_error(fmt::format("PutMetadataRequest payload has unexpected payload type '{}', expected '{}'", + ToString(MetaPayload.GetContentType()), + ToString(ZenContentType::kCbPackage))); + } + CbPackage Message = ParsePackageMessage(MetaPayload); + + CbObjectView MessageObject = Message.GetObject(); + if (!MessageObject) + { + throw std::runtime_error("PutMetadataRequest payload object is missing"); + } + CbArrayView BlobsArray = MessageObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadataArray = MessageObject["metadatas"sv].AsArrayView(); + + const uint64_t BlobCount = BlobsArray.Num(); + if (BlobCount == 0) + { + throw std::runtime_error("PutMetadataRequest blobs array is empty"); + } + if (BlobCount != MetadataArray.Num()) + { + throw std::runtime_error( + fmt::format("PutMetadataRequest metadata array size {} does not match blobs array size {}", MetadataArray.Num(), BlobCount)); + } + + std::vector<IoHash> BlobHashes; + std::vector<IoBuffer> MetadataPayloads; + + BlobHashes.reserve(BlobCount); + MetadataPayloads.reserve(BlobCount); + + auto BlobsArrayIt = begin(BlobsArray); + auto MetadataArrayIt = begin(MetadataArray); + while (BlobsArrayIt != end(BlobsArray)) + { + const IoHash BlobHash = (*BlobsArrayIt).AsHash(); + const IoHash MetadataHash = (*MetadataArrayIt).AsAttachment(); + + const CbAttachment* Attachment = Message.FindAttachment(MetadataHash); + if (Attachment == nullptr) + { + throw std::runtime_error(fmt::format("Blob metadata attachment {} is missing", MetadataHash)); + } + BlobHashes.push_back(BlobHash); + if (Attachment->IsObject()) + { + MetadataPayloads.push_back(Attachment->AsObject().GetBuffer().MakeOwned().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kCbObject); + } + else if (Attachment->IsCompressedBinary()) + { + MetadataPayloads.push_back(Attachment->AsCompressedBinary().GetCompressed().Flatten().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kCompressedBinary); + } + else + { + ZEN_ASSERT(Attachment->IsBinary()); + MetadataPayloads.push_back(Attachment->AsBinary().AsIoBuffer()); + MetadataPayloads.back().SetContentType(ZenContentType::kBinary); + } + + BlobsArrayIt++; + MetadataArrayIt++; + } + m_BuildStore.PutMetadatas(BlobHashes, MetadataPayloads); + return ServerRequest.WriteResponse(HttpResponseCode::OK); +} + +void +HttpBuildStoreService::GetMetadatasRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::GetMetadatasRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoBuffer RequestPayload = ServerRequest.ReadPayload(); + if (!RequestPayload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Expected compact binary body for metadata request, body is missing"); + } + if (RequestPayload.GetContentType() != HttpContentType::kCbObject) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Expected compact binary body for metadata request, got {}", ToString(RequestPayload.GetContentType()))); + } + if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default); + ValidateError != CbValidateError::None) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for metadata request is not valid, reason: {}", ToString(ValidateError))); + } + CbObject RequestObject = LoadCompactBinaryObject(RequestPayload); + CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView(); + if (!BlobsArray) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary body for metadata request is missing 'blobHashes' array"); + } + const uint64_t BlobCount = BlobsArray.Num(); + + std::vector<IoHash> BlobRawHashes; + BlobRawHashes.reserve(BlobCount); + for (CbFieldView BlockHashView : BlobsArray) + { + BlobRawHashes.push_back(BlockHashView.AsHash()); + if (BlobRawHashes.back() == IoHash::Zero) + { + const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType(); + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for metadata 'blobHashes' array contains invalid field type: {}", Type)); + } + } + m_BuildStoreStats.BlobMetaReadCount += BlobRawHashes.size(); + std::vector<IoBuffer> BlockMetadatas = m_BuildStore.GetMetadatas(BlobRawHashes, &GetSmallWorkerPool(EWorkloadType::Burst)); + + CbPackage ResponsePackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter ResponseWriter; + + ResponseWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + if (BlockMetadatas[BlockHashIndex]) + { + const IoHash& BlockHash = BlobRawHashes[BlockHashIndex]; + ResponseWriter.AddHash(BlockHash); + } + } + ResponseWriter.EndArray(); // blobHashes + + ResponseWriter.BeginArray("metadatas"); + + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + if (IoBuffer Metadata = BlockMetadatas[BlockHashIndex]; Metadata) + { + switch (Metadata.GetContentType()) + { + case ZenContentType::kCbObject: + { + CbObject Object = CbObject(SharedBuffer(std::move(Metadata)).MakeOwned()); + const IoHash ObjectHash = Object.GetHash(); + ResponseWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(Object, ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + break; + case ZenContentType::kCompressedBinary: + { + IoHash RawHash; + uint64_t _; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Metadata)), RawHash, _); + ResponseWriter.AddBinaryAttachment(RawHash); + if (!AttachmentHashes.contains(RawHash)) + { + Attachments.push_back(CbAttachment(Compressed, RawHash)); + AttachmentHashes.insert(RawHash); + } + } + break; + default: + { + const IoHash RawHash = IoHash::HashBuffer(Metadata); + ResponseWriter.AddBinaryAttachment(RawHash); + if (!AttachmentHashes.contains(RawHash)) + { + Attachments.push_back(CbAttachment(SharedBuffer(Metadata), RawHash)); + AttachmentHashes.insert(RawHash); + } + } + break; + } + } + } + + ResponseWriter.EndArray(); // metadatas + + ResponsePackage.SetObject(ResponseWriter.Save()); + } + ResponsePackage.AddAttachments(Attachments); + + CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage); + ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer); +} + +void +HttpBuildStoreService::BlobsExistsRequest(HttpRouterRequest& Req) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::BlobsExistsRequest"); + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view Namespace = Req.GetCapture(1); + std::string_view Bucket = Req.GetCapture(2); + std::string_view BuildId = Req.GetCapture(3); + ZEN_UNUSED(Namespace, Bucket, BuildId); + IoBuffer RequestPayload = ServerRequest.ReadPayload(); + if (!RequestPayload) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Expected compact binary body for blob exists request, body is missing"); + } + if (RequestPayload.GetContentType() != HttpContentType::kCbObject) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Expected compact binary body for blob exists request, got {}", ToString(RequestPayload.GetContentType()))); + } + if (CbValidateError ValidateError = ValidateCompactBinary(RequestPayload.GetView(), CbValidateMode::Default); + ValidateError != CbValidateError::None) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for blob exists request is not valid, reason: {}", ToString(ValidateError))); + } + CbObject RequestObject = LoadCompactBinaryObject(RequestPayload); + CbArrayView BlobsArray = RequestObject["blobHashes"sv].AsArrayView(); + if (!BlobsArray) + { + m_BuildStoreStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Compact binary body for blob exists request is missing 'blobHashes' array"); + } + + std::vector<IoHash> BlobRawHashes; + BlobRawHashes.reserve(BlobsArray.Num()); + for (CbFieldView BlockHashView : BlobsArray) + { + BlobRawHashes.push_back(BlockHashView.AsHash()); + if (BlobRawHashes.back() == IoHash::Zero) + { + const uint8_t Type = (uint8_t)BlockHashView.GetValue().GetType(); + return ServerRequest.WriteResponse( + HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Compact binary body for blob exists request 'blobHashes' array contains invalid field type: {}", Type)); + } + } + + m_BuildStoreStats.BlobExistsCount += BlobRawHashes.size(); + std::vector<BuildStore::BlobExistsResult> BlobsExists = m_BuildStore.BlobsExists(BlobRawHashes); + CbObjectWriter ResponseWriter(9 * BlobsExists.size()); + ResponseWriter.BeginArray("blobExists"sv); + for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists) + { + ResponseWriter.AddBool(BlobExists.HasBody); + if (BlobExists.HasBody) + { + m_BuildStoreStats.BlobExistsBodyHitCount++; + } + } + ResponseWriter.EndArray(); // blobExist + ResponseWriter.BeginArray("metadataExists"sv); + for (const BuildStore::BlobExistsResult& BlobExists : BlobsExists) + { + ResponseWriter.AddBool(BlobExists.HasBody); + if (BlobExists.HasMetadata) + { + m_BuildStoreStats.BlobExistsMetaHitCount++; + } + } + ResponseWriter.EndArray(); // metadataExists + CbObject ResponseObject = ResponseWriter.Save(); + return ServerRequest.WriteResponse(HttpResponseCode::OK, ResponseObject); +} + +void +HttpBuildStoreService::HandleStatsRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::Stats"); + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo.BeginObject("builds"); + { + Cbo.BeginObject("blobs"); + { + Cbo << "readcount" << m_BuildStoreStats.BlobReadCount << "writecount" << m_BuildStoreStats.BlobWriteCount << "hitcount" + << m_BuildStoreStats.BlobHitCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("metadata"); + { + Cbo << "readcount" << m_BuildStoreStats.BlobMetaReadCount << "writecount" << m_BuildStoreStats.BlobMetaWriteCount << "hitcount" + << m_BuildStoreStats.BlobMetaHitCount; + } + Cbo.EndObject(); + + Cbo << "requestcount" << m_BuildStoreStats.RequestCount; + Cbo << "badrequestcount" << m_BuildStoreStats.BadRequestCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("size"); + { + BuildStore::StorageStats StorageStats = m_BuildStore.GetStorageStats(); + + Cbo << "count" << StorageStats.EntryCount; + Cbo << "bytes" << StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes + StorageStats.MetadataByteCount; + Cbo.BeginObject("blobs"); + { + Cbo << "count" << (StorageStats.LargeBlobCount + StorageStats.SmallBlobCount); + Cbo << "bytes" << (StorageStats.LargeBlobBytes + StorageStats.SmallBlobBytes); + Cbo.BeginObject("large"); + { + Cbo << "count" << StorageStats.LargeBlobCount; + Cbo << "bytes" << StorageStats.LargeBlobBytes; + } + Cbo.EndObject(); // large + Cbo.BeginObject("small"); + { + Cbo << "count" << StorageStats.SmallBlobCount; + Cbo << "bytes" << StorageStats.SmallBlobBytes; + } + Cbo.EndObject(); // small + } + Cbo.EndObject(); // blobs + + Cbo.BeginObject("metadata"); + { + Cbo << "count" << StorageStats.MetadataCount; + Cbo << "bytes" << StorageStats.MetadataByteCount; + } + Cbo.EndObject(); // metadata + } + Cbo.EndObject(); // size + + return Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpBuildStoreService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpBuildStoreService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +} // namespace zen diff --git a/src/zenserver/buildstore/httpbuildstore.h b/src/zenserver/buildstore/httpbuildstore.h new file mode 100644 index 000000000..50cb5db12 --- /dev/null +++ b/src/zenserver/buildstore/httpbuildstore.h @@ -0,0 +1,68 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/stats.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/httpstats.h> +#include <zenhttp/httpstatus.h> + +#include <filesystem> + +namespace zen { + +class BuildStore; + +class HttpBuildStoreService final : public zen::HttpService, public IHttpStatusProvider, public IHttpStatsProvider +{ +public: + HttpBuildStoreService(HttpStatusService& StatusService, HttpStatsService& StatsService, BuildStore& Store); + virtual ~HttpBuildStoreService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; + +private: + struct BuildStoreStats + { + std::atomic_uint64_t BlobReadCount{}; + std::atomic_uint64_t BlobHitCount{}; + std::atomic_uint64_t BlobWriteCount{}; + std::atomic_uint64_t BlobMetaReadCount{}; + std::atomic_uint64_t BlobMetaHitCount{}; + std::atomic_uint64_t BlobMetaWriteCount{}; + std::atomic_uint64_t BlobExistsCount{}; + std::atomic_uint64_t BlobExistsBodyHitCount{}; + std::atomic_uint64_t BlobExistsMetaHitCount{}; + std::atomic_uint64_t RequestCount{}; + std::atomic_uint64_t BadRequestCount{}; + }; + + void Initialize(); + + inline LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + void PutBlobRequest(HttpRouterRequest& Req); + void GetBlobRequest(HttpRouterRequest& Req); + + void PutMetadataRequest(HttpRouterRequest& Req); + void GetMetadatasRequest(HttpRouterRequest& Req); + + void BlobsExistsRequest(HttpRouterRequest& Req); + + HttpRequestRouter m_Router; + + HttpStatusService& m_StatusService; + HttpStatsService& m_StatsService; + + BuildStore& m_BuildStore; + BuildStoreStats m_BuildStoreStats; + metrics::OperationTiming m_HttpRequests; +}; + +} // namespace zen diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 224cc6678..acb6053d9 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -25,6 +25,7 @@ #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/parallelwork.h> #include <zenutil/workerpools.h> #include "upstream/upstreamcache.h" @@ -1643,15 +1644,26 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co WorkerThreadPool WorkerPool(ThreadCount); uint64_t RequestCount = Replayer.GetRequestCount(); Stopwatch Timer; - auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); - Latch JobLatch(RequestCount); + auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { - WorkerPool.ScheduleWork([this, &Context, &JobLatch, &Replayer, RequestIndex]() { + if (AbortFlag) + { + break; + } + Work.ScheduleWork(WorkerPool, [this, &Context, &Replayer, RequestIndex](std::atomic<bool>& AbortFlag) { IoBuffer Body; zen::cache::RecordedRequestInfo RequestInfo = Replayer.GetRequest(RequestIndex, /* out */ Body); + if (AbortFlag) + { + return; + } + if (Body) { uint32_t AcceptMagic = 0; @@ -1692,16 +1704,15 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } } } - JobLatch.CountDown(); }); } - while (!JobLatch.Wait(10000)) - { + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", - RequestCount - JobLatch.Remaining(), + RequestCount - PendingWork, RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); - } + }); } void diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index c8949f5fd..d53bedad0 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -15,12 +15,15 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zenhttp/zenhttp.h> +#include <zenutil/commandlineoptions.h> +#include <zenutil/environmentoptions.h> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> #include <fmt/ranges.h> #include <zencore/logging.h> #include <cxxopts.hpp> +#include <json11.hpp> #include <sol/sol.hpp> ZEN_THIRD_PARTY_INCLUDES_END @@ -178,27 +181,6 @@ ParseBucketConfigs(std::span<std::string> Buckets) return Cfg; } -static std::string -MakeSafePath(const std::string_view Path) -{ -#if ZEN_PLATFORM_WINDOWS - if (Path.empty()) - { - return std::string(Path); - } - - std::string FixedPath(Path); - std::replace(FixedPath.begin(), FixedPath.end(), '/', '\\'); - if (!FixedPath.starts_with("\\\\?\\")) - { - FixedPath.insert(0, "\\\\?\\"); - } - return FixedPath; -#else - return std::string(Path); -#endif -}; - class CachePolicyOption : public LuaConfig::OptionValue { public: @@ -324,7 +306,7 @@ public: std::string Name = Bucket.value().get_or("name", std::string("Default")); std::string Directory = Bucket.value().get_or("directory", std::string()); - Value.Buckets.push_back({.Name = std::move(Name), .Directory = LuaConfig::MakeSafePath(Directory)}); + Value.Buckets.push_back({.Name = std::move(Name), .Directory = MakeSafeAbsolutePath(Directory)}); } } } @@ -332,6 +314,97 @@ public: ZenObjectStoreConfig& Value; }; +class ZenStructuredCacheBucketsConfigOption : public LuaConfig::OptionValue +{ +public: + ZenStructuredCacheBucketsConfigOption(std::vector<std::pair<std::string, ZenStructuredCacheBucketConfig>>& Value) : Value(Value) {} + virtual void Print(std::string_view Indent, zen::StringBuilderBase& StringBuilder) override + { + if (Value.empty()) + { + StringBuilder.Append("{}"); + return; + } + LuaConfig::LuaContainerWriter Writer(StringBuilder, Indent); + for (const std::pair<std::string, ZenStructuredCacheBucketConfig>& Bucket : Value) + { + Writer.BeginContainer(""); + { + Writer.WriteValue("name", Bucket.first); + const ZenStructuredCacheBucketConfig& BucketConfig = Bucket.second; + + Writer.WriteValue("maxblocksize", fmt::format("{}", BucketConfig.MaxBlockSize)); + Writer.BeginContainer("memlayer"); + { + Writer.WriteValue("sizethreshold", fmt::format("{}", BucketConfig.MemCacheSizeThreshold)); + } + Writer.EndContainer(); + + Writer.WriteValue("payloadalignment", fmt::format("{}", BucketConfig.PayloadAlignment)); + Writer.WriteValue("largeobjectthreshold", fmt::format("{}", BucketConfig.PayloadAlignment)); + } + Writer.EndContainer(); + } + } + virtual void Parse(sol::object Object) override + { + if (sol::optional<sol::table> Buckets = Object.as<sol::table>()) + { + for (const auto& Kv : Buckets.value()) + { + if (sol::optional<sol::table> Bucket = Kv.second.as<sol::table>()) + { + ZenStructuredCacheBucketConfig BucketConfig; + std::string Name = Kv.first.as<std::string>(); + if (Name.empty()) + { + throw zen::OptionParseException(fmt::format("cache bucket option must have a name.")); + } + + const uint64_t MaxBlockSize = Bucket.value().get_or("maxblocksize", BucketConfig.MaxBlockSize); + if (MaxBlockSize == 0) + { + throw zen::OptionParseException( + fmt::format("maxblocksize option for cache bucket '{}' is invalid. It must be non-zero.", Name)); + } + BucketConfig.MaxBlockSize = MaxBlockSize; + + if (sol::optional<sol::table> Memlayer = Bucket.value().get_or("memlayer", sol::table())) + { + const uint64_t MemCacheSizeThreshold = Bucket.value().get_or("sizethreshold", BucketConfig.MemCacheSizeThreshold); + if (MemCacheSizeThreshold == 0) + { + throw zen::OptionParseException( + fmt::format("memlayer.sizethreshold option for cache bucket '{}' is invalid. It must be non-zero.", Name)); + } + BucketConfig.MemCacheSizeThreshold = Bucket.value().get_or("sizethreshold", BucketConfig.MemCacheSizeThreshold); + } + + const uint32_t PayloadAlignment = Bucket.value().get_or("payloadalignment", BucketConfig.PayloadAlignment); + if (PayloadAlignment == 0 || !IsPow2(PayloadAlignment)) + { + throw zen::OptionParseException(fmt::format( + "payloadalignment option for cache bucket '{}' is invalid. It needs to be non-zero and a power of two.", + Name)); + } + BucketConfig.PayloadAlignment = PayloadAlignment; + + const uint64_t LargeObjectThreshold = Bucket.value().get_or("largeobjectthreshold", BucketConfig.LargeObjectThreshold); + if (LargeObjectThreshold == 0) + { + throw zen::OptionParseException( + fmt::format("largeobjectthreshold option for cache bucket '{}' is invalid. It must be non-zero.", Name)); + } + BucketConfig.LargeObjectThreshold = LargeObjectThreshold; + + Value.push_back(std::make_pair(std::move(Name), BucketConfig)); + } + } + } + } + std::vector<std::pair<std::string, ZenStructuredCacheBucketConfig>>& Value; +}; + std::shared_ptr<LuaConfig::OptionValue> MakeOption(zen::UpstreamCachePolicy& Value) { @@ -350,6 +423,35 @@ MakeOption(zen::ZenObjectStoreConfig& Value) return std::make_shared<ZenObjectStoreConfigOption>(Value); }; +std::shared_ptr<LuaConfig::OptionValue> +MakeOption(std::vector<std::pair<std::string, ZenStructuredCacheBucketConfig>>& Value) +{ + return std::make_shared<ZenStructuredCacheBucketsConfigOption>(Value); +}; + +void +ParseEnvVariables(ZenServerOptions& ServerOptions, const cxxopts::ParseResult& CmdLineResult) +{ + using namespace std::literals; + + EnvironmentOptions Options; + Options.AddOption("UE_ZEN_SENTRY_ALLOWPERSONALINFO"sv, ServerOptions.SentryConfig.AllowPII, "sentry-allow-personal-info"sv); + Options.AddOption("UE_ZEN_SENTRY_DSN"sv, ServerOptions.SentryConfig.Dsn, "sentry-dsn"sv); + Options.AddOption("UE_ZEN_SENTRY_ENVIRONMENT"sv, ServerOptions.SentryConfig.Environment, "sentry-environment"sv); + + bool EnvEnableSentry = !ServerOptions.SentryConfig.Disable; + Options.AddOption("UE_ZEN_SENTRY_ENABLED"sv, EnvEnableSentry, "no-sentry"sv); + + Options.AddOption("UE_ZEN_SENTRY_DEBUG"sv, ServerOptions.SentryConfig.Debug, "sentry-debug"sv); + + Options.Parse(CmdLineResult); + + if (EnvEnableSentry != !ServerOptions.SentryConfig.Disable) + { + ServerOptions.SentryConfig.Disable = !EnvEnableSentry; + } +} + void ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptions, @@ -363,12 +465,16 @@ ParseConfigFile(const std::filesystem::path& Path, ////// server LuaOptions.AddOption("server.dedicated"sv, ServerOptions.IsDedicated, "dedicated"sv); LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv); - LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv); - LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv); + LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.SentryConfig.Disable, "no-sentry"sv); + LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryConfig.AllowPII, "sentry-allow-personal-info"sv); + LuaOptions.AddOption("server.sentry.dsn"sv, ServerOptions.SentryConfig.Dsn, "sentry-dsn"sv); + LuaOptions.AddOption("server.sentry.environment"sv, ServerOptions.SentryConfig.Environment, "sentry-environment"sv); + LuaOptions.AddOption("server.sentry.debug"sv, ServerOptions.SentryConfig.Debug, "sentry-debug"sv); LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv); LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv); LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv); LuaOptions.AddOption("server.abslog"sv, ServerOptions.AbsLogFile, "abslog"sv); + LuaOptions.AddOption("server.pluginsconfigfile"sv, ServerOptions.PluginsConfigFile, "plugins-config"sv); LuaOptions.AddOption("server.debug"sv, ServerOptions.IsDebug, "debug"sv); LuaOptions.AddOption("server.clean"sv, ServerOptions.IsCleanStart, "clean"sv); LuaOptions.AddOption("server.noconsole"sv, ServerOptions.NoConsoleOutput, "quiet"sv); @@ -377,6 +483,10 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("server.objectstore.enabled"sv, ServerOptions.ObjectStoreEnabled, "objectstore-enabled"sv); LuaOptions.AddOption("server.objectstore.buckets"sv, ServerOptions.ObjectStoreConfig); + ////// buildsstore + LuaOptions.AddOption("server.buildstore.enabled"sv, ServerOptions.BuildStoreConfig.Enabled, "buildstore-enabled"sv); + LuaOptions.AddOption("server.buildstore.disksizelimit"sv, ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit, "buildstore-disksizelimit"); + ////// network LuaOptions.AddOption("network.httpserverclass"sv, ServerOptions.HttpServerConfig.ServerClass, "http"sv); LuaOptions.AddOption("network.httpserverthreads"sv, ServerOptions.HttpServerConfig.ThreadCount, "http-threads"sv); @@ -411,7 +521,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("cache.limitoverwrites"sv, ServerOptions.StructuredCacheConfig.LimitOverwrites, "cache-limit-overwrites"sv); LuaOptions.AddOption("cache.memlayer.sizethreshold"sv, - ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold, + ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, "cache-memlayer-sizethreshold"sv); LuaOptions.AddOption("cache.memlayer.targetfootprint"sv, ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes, @@ -421,6 +531,19 @@ ParseConfigFile(const std::filesystem::path& Path, "cache-memlayer-triminterval"sv); LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage"sv); + LuaOptions.AddOption("cache.bucket.maxblocksize"sv, + ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize, + "cache-bucket-maxblocksize"sv); + LuaOptions.AddOption("cache.bucket.memlayer.sizethreshold"sv, + ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, + "cache-bucket-memlayer-sizethreshold"sv); + LuaOptions.AddOption("cache.bucket.payloadalignment"sv, + ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment, + "cache-bucket-payloadalignment"sv); + LuaOptions.AddOption("cache.bucket.largeobjectthreshold"sv, + ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold, + "cache-bucket-largeobjectthreshold"sv); + ////// cache.upstream LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv); LuaOptions.AddOption("cache.upstream.upstreamthreadcount"sv, @@ -493,6 +616,9 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("gc.projectstore.duration.seconds"sv, ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds, "gc-projectstore-duration-seconds"); + LuaOptions.AddOption("gc.buildstore.duration.seconds"sv, + ServerOptions.GcConfig.BuildStore.MaxDurationSeconds, + "gc-buildstore-duration-seconds"); ////// security LuaOptions.AddOption("security.encryptionaeskey"sv, ServerOptions.EncryptionKey, "encryption-aes-key"sv); @@ -505,6 +631,8 @@ ParseConfigFile(const std::filesystem::path& Path, ServerOptions.WorksSpacesConfig.AllowConfigurationChanges, "workspaces-allow-changes"sv); + LuaOptions.AddOption("cache.buckets"sv, ServerOptions.StructuredCacheConfig.PerBucketConfigs, "cache.buckets"sv); + LuaOptions.Parse(Path, CmdLineResult); // These have special command line processing so we make sure we export them if they were configured on command line @@ -516,10 +644,14 @@ ParseConfigFile(const std::filesystem::path& Path, { LuaOptions.Touch("server.objectstore.buckets"sv); } + if (!ServerOptions.StructuredCacheConfig.PerBucketConfigs.empty()) + { + LuaOptions.Touch("cache.buckets"sv); + } if (!OutputConfigFile.empty()) { - std::filesystem::path WritePath(MakeSafePath(OutputConfigFile)); + std::filesystem::path WritePath(MakeSafeAbsolutePath(OutputConfigFile)); zen::ExtendableStringBuilder<512> ConfigStringBuilder; LuaOptions.Print(ConfigStringBuilder, CmdLineResult); zen::BasicFile Output; @@ -529,6 +661,68 @@ ParseConfigFile(const std::filesystem::path& Path, } void +ParsePluginsConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptions, int BasePort) +{ + using namespace std::literals; + + IoBuffer Body = IoBufferBuilder::MakeFromFile(Path); + std::string JsonText(reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()); + std::string JsonError; + json11::Json PluginsInfo = json11::Json::parse(JsonText, JsonError); + if (!JsonError.empty()) + { + ZEN_WARN("failed parsing plugins config file '{}'. Reason: '{}'", Path, JsonError); + return; + } + for (const json11::Json& PluginInfo : PluginsInfo.array_items()) + { + if (!PluginInfo.is_object()) + { + ZEN_WARN("the json file '{}' does not contain a valid plugin definition, object expected, got '{}'", Path, PluginInfo.dump()); + continue; + } + + HttpServerPluginConfig Config = {}; + + bool bNeedsPort = true; + + for (const std::pair<const std::string, json11::Json>& Items : PluginInfo.object_items()) + { + if (!Items.second.is_string()) + { + ZEN_WARN("the json file '{}' does not contain a valid plugins definition, string expected, got '{}'", + Path, + Items.second.dump()); + continue; + } + + const std::string& Name = Items.first; + const std::string& Value = Items.second.string_value(); + + if (Name == "name"sv) + Config.PluginName = Value; + else + { + Config.PluginOptions.push_back({Name, Value}); + + if (Name == "port"sv) + { + bNeedsPort = false; + } + } + } + + // add a default base port in case if json config didn't provide one + if (bNeedsPort) + { + Config.PluginOptions.push_back({"port", std::to_string(BasePort)}); + } + + ServerOptions.HttpServerConfig.PluginConfigs.push_back(Config); + } +} + +void ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) { const char* DefaultHttp = "asio"; @@ -560,6 +754,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) std::string ContentDir; std::string AbsLogFile; std::string ConfigFile; + std::string PluginsConfigFile; std::string OutputConfigFile; std::string BaseSnapshotDir; @@ -587,13 +782,19 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) "Exit immediately after initialization is complete", cxxopts::value<bool>(ServerOptions.IsPowerCycle)); options.add_options()("config", "Path to Lua config file", cxxopts::value<std::string>(ConfigFile)); + options.add_options()("plugins-config", "Path to plugins config file", cxxopts::value<std::string>(PluginsConfigFile)); options.add_options()("write-config", "Path to output Lua config file", cxxopts::value<std::string>(OutputConfigFile)); options.add_options()("no-sentry", "Disable Sentry crash handler", - cxxopts::value<bool>(ServerOptions.NoSentry)->default_value("false")); + cxxopts::value<bool>(ServerOptions.SentryConfig.Disable)->default_value("false")); options.add_options()("sentry-allow-personal-info", "Allow personally identifiable information in sentry crash reports", - cxxopts::value<bool>(ServerOptions.SentryAllowPII)->default_value("false")); + cxxopts::value<bool>(ServerOptions.SentryConfig.AllowPII)->default_value("false")); + options.add_options()("sentry-dsn", "Sentry DSN to send events to", cxxopts::value<std::string>(ServerOptions.SentryConfig.Dsn)); + options.add_options()("sentry-environment", "Sentry environment", cxxopts::value<std::string>(ServerOptions.SentryConfig.Environment)); + options.add_options()("sentry-debug", + "Enable debug mode for Sentry", + cxxopts::value<bool>(ServerOptions.SentryConfig.Debug)->default_value("false")); options.add_options()("detach", "Indicate whether zenserver should detach from parent process group", cxxopts::value<bool>(ServerOptions.Detach)->default_value("true")); @@ -871,8 +1072,9 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) "cache", "", "cache-memlayer-sizethreshold", - "The largest size of a cache entry that may be cached in memory. Default set to 1024 (1 Kb). Set to 0 to disable memory caching.", - cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold)->default_value("1024"), + "The largest size of a cache entry that may be cached in memory. Default set to 1024 (1 Kb). Set to 0 to disable memory caching. " + "Obsolete, replaced by `--cache-bucket-memlayer-sizethreshold`", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold)->default_value("1024"), ""); options.add_option("cache", @@ -896,6 +1098,36 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"), ""); + options.add_option("cache", + "", + "cache-bucket-maxblocksize", + "Max size of cache bucket blocks. Default set to 1073741824 (1GB).", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize)->default_value("1073741824"), + ""); + + options.add_option("cache", + "", + "cache-bucket-payloadalignment", + "Payload alignement for cache bucket blocks. Default set to 16.", + cxxopts::value<uint32_t>(ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment)->default_value("16"), + ""); + + options.add_option( + "cache", + "", + "cache-bucket-largeobjectthreshold", + "Threshold for storing cache bucket values as loose files. Default set to 131072 (128 KB).", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold)->default_value("131072"), + ""); + + options.add_option( + "cache", + "", + "cache-bucket-memlayer-sizethreshold", + "The largest size of a cache entry that may be cached in memory. Default set to 1024 (1 Kb). Set to 0 to disable memory caching.", + cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold)->default_value("1024"), + ""); + options.add_option("gc", "", "gc-cache-attachment-store", @@ -968,6 +1200,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_option("gc", "", + "gc-buildstore-duration-seconds", + "Max duration in seconds before build store entries get evicted. Default set to 604800 (1 week)", + cxxopts::value<int32_t>(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds)->default_value("604800"), + ""); + + options.add_option("gc", + "", "disk-reserve-size", "Size of gc disk reserve in bytes. Default set to 268435456 (256 Mb). Set to zero to disable.", cxxopts::value<uint64_t>(ServerOptions.GcConfig.DiskReserveSize)->default_value("268435456"), @@ -1039,6 +1278,19 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<std::vector<std::string>>(BucketConfigs), ""); + options.add_option("buildstore", + "", + "buildstore-enabled", + "Whether the builds store is enabled or not.", + cxxopts::value<bool>(ServerOptions.BuildStoreConfig.Enabled)->default_value("false"), + ""); + options.add_option("buildstore", + "", + "buildstore-disksizelimit", + "Max number of bytes before build store entries get evicted. Default set to 1099511627776 (1TB week)", + cxxopts::value<uint64_t>(ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit)->default_value("1099511627776"), + ""); + options.add_option("stats", "", "statsd", @@ -1091,12 +1343,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) } logging::RefreshLogLevels(); - ServerOptions.SystemRootDir = MakeSafePath(SystemRootDir); - ServerOptions.DataDir = MakeSafePath(DataDir); - ServerOptions.BaseSnapshotDir = MakeSafePath(BaseSnapshotDir); - ServerOptions.ContentDir = MakeSafePath(ContentDir); - ServerOptions.AbsLogFile = MakeSafePath(AbsLogFile); - ServerOptions.ConfigFile = MakeSafePath(ConfigFile); + ServerOptions.SystemRootDir = MakeSafeAbsolutePath(SystemRootDir); + ServerOptions.DataDir = MakeSafeAbsolutePath(DataDir); + ServerOptions.BaseSnapshotDir = MakeSafeAbsolutePath(BaseSnapshotDir); + ServerOptions.ContentDir = MakeSafeAbsolutePath(ContentDir); + ServerOptions.AbsLogFile = MakeSafeAbsolutePath(AbsLogFile); + ServerOptions.ConfigFile = MakeSafeAbsolutePath(ConfigFile); + ServerOptions.PluginsConfigFile = MakeSafeAbsolutePath(PluginsConfigFile); ServerOptions.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions); if (!BaseSnapshotDir.empty()) @@ -1104,7 +1357,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) if (DataDir.empty()) throw zen::OptionParseException("You must explicitly specify a data directory when specifying a base snapshot"); - if (!std::filesystem::is_directory(ServerOptions.BaseSnapshotDir)) + if (!IsDir(ServerOptions.BaseSnapshotDir)) throw OptionParseException(fmt::format("Snapshot directory must be a directory: '{}", BaseSnapshotDir)); } @@ -1121,6 +1374,8 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) ServerOptions.ObjectStoreConfig = ParseBucketConfigs(BucketConfigs); + ParseEnvVariables(ServerOptions, Result); + if (!ServerOptions.ConfigFile.empty()) { ParseConfigFile(ServerOptions.ConfigFile, ServerOptions, Result, OutputConfigFile); @@ -1130,6 +1385,11 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) ParseConfigFile(ServerOptions.DataDir / "zen_cfg.lua", ServerOptions, Result, OutputConfigFile); } + if (!ServerOptions.PluginsConfigFile.empty()) + { + ParsePluginsConfigFile(ServerOptions.PluginsConfigFile, ServerOptions, ServerOptions.BasePort); + } + ValidateOptions(ServerOptions); } catch (const zen::OptionParseException& e) diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 4ce265c78..7d90aa4c1 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -59,11 +59,17 @@ struct ZenProjectStoreEvictionPolicy int32_t MaxDurationSeconds = 7 * 24 * 60 * 60; }; +struct ZenBuildStoreEvictionPolicy +{ + int32_t MaxDurationSeconds = 3 * 24 * 60 * 60; +}; + struct ZenGcConfig { // ZenCasEvictionPolicy Cas; ZenCacheEvictionPolicy Cache; ZenProjectStoreEvictionPolicy ProjectStore; + ZenBuildStoreEvictionPolicy BuildStore; int32_t MonitorIntervalSeconds = 30; int32_t IntervalSeconds = 0; bool CollectSmallObjects = true; @@ -113,16 +119,25 @@ struct ZenStatsConfig int StatsdPort = 8125; }; +struct ZenStructuredCacheBucketConfig +{ + uint64_t MaxBlockSize = 1ull << 30; + uint32_t PayloadAlignment = 1u << 4; + uint64_t MemCacheSizeThreshold = 1 * 1024; + uint64_t LargeObjectThreshold = 128 * 1024; +}; + struct ZenStructuredCacheConfig { - bool Enabled = true; - bool WriteLogEnabled = false; - bool AccessLogEnabled = false; - bool LimitOverwrites = false; - uint64_t MemCacheSizeThreshold = 1 * 1024; - uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024; - uint64_t MemTrimIntervalSeconds = 60; - uint64_t MemMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); + bool Enabled = true; + bool WriteLogEnabled = false; + bool AccessLogEnabled = false; + bool LimitOverwrites = false; + std::vector<std::pair<std::string, ZenStructuredCacheBucketConfig>> PerBucketConfigs; + ZenStructuredCacheBucketConfig BucketConfig; + uint64_t MemTargetFootprintBytes = 512 * 1024 * 1024; + uint64_t MemTrimIntervalSeconds = 60; + uint64_t MemMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); }; struct ZenProjectStoreConfig @@ -131,12 +146,27 @@ struct ZenProjectStoreConfig bool StoreProjectAttachmentMetaData = false; }; +struct ZenBuildStoreConfig +{ + bool Enabled = false; + uint64_t MaxDiskSpaceLimit = 1u * 1024u * 1024u * 1024u * 1024u; // 1TB +}; + struct ZenWorkspacesConfig { bool Enabled = false; bool AllowConfigurationChanges = false; }; +struct ZenSentryConfig +{ + bool Disable = false; + bool AllowPII = false; // Allow personally identifiable information in sentry crash reports + std::string Dsn; + std::string Environment; + bool Debug = false; // Enable debug mode for Sentry +}; + struct ZenServerOptions { ZenUpstreamCacheConfig UpstreamCacheConfig; @@ -146,13 +176,16 @@ struct ZenServerOptions zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; ZenProjectStoreConfig ProjectStoreConfig; + ZenBuildStoreConfig BuildStoreConfig; ZenStatsConfig StatsConfig; ZenWorkspacesConfig WorksSpacesConfig; + ZenSentryConfig SentryConfig; std::filesystem::path SystemRootDir; // System root directory (used for machine level config) std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) std::filesystem::path AbsLogFile; // Absolute path to main log file std::filesystem::path ConfigFile; // Path to Lua config file + std::filesystem::path PluginsConfigFile; // Path to plugins config file std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start) std::string ChildId; // Id assigned by parent process (used for lifetime management) std::string LogId; // Id for tagging log output @@ -169,9 +202,7 @@ struct ZenServerOptions bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements bool ShouldCrash = false; // Option for testing crash handling bool IsFirstRun = false; - bool NoSentry = false; - bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports - bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) + bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) bool ObjectStoreEnabled = false; bool NoConsoleOutput = false; // Control default use of stdout for diagnostics std::string Loggers[zen::logging::level::LogLevelCount]; diff --git a/src/zenserver/config/luaconfig.cpp b/src/zenserver/config/luaconfig.cpp index f742fa34a..2c54de29e 100644 --- a/src/zenserver/config/luaconfig.cpp +++ b/src/zenserver/config/luaconfig.cpp @@ -4,27 +4,6 @@ namespace zen::LuaConfig { -std::string -MakeSafePath(const std::string_view Path) -{ -#if ZEN_PLATFORM_WINDOWS - if (Path.empty()) - { - return std::string(Path); - } - - std::string FixedPath(Path); - std::replace(FixedPath.begin(), FixedPath.end(), '/', '\\'); - if (!FixedPath.starts_with("\\\\?\\")) - { - FixedPath.insert(0, "\\\\?\\"); - } - return FixedPath; -#else - return std::string(Path); -#endif -}; - void EscapeBackslash(std::string& InOutString) { @@ -101,7 +80,7 @@ FilePathOption::Parse(sol::object Object) std::string Str = Object.as<std::string>(); if (!Str.empty()) { - Value = MakeSafePath(Str); + Value = MakeSafeAbsolutePath(Str); } } diff --git a/src/zenserver/config/luaconfig.h b/src/zenserver/config/luaconfig.h index 76b3088a3..ce7013a9a 100644 --- a/src/zenserver/config/luaconfig.h +++ b/src/zenserver/config/luaconfig.h @@ -4,10 +4,10 @@ #include <zenbase/concepts.h> #include <zencore/fmtutils.h> +#include <zenutil/commandlineoptions.h> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> -#include <cxxopts.hpp> #include <sol/sol.hpp> ZEN_THIRD_PARTY_INCLUDES_END @@ -20,8 +20,7 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen::LuaConfig { -std::string MakeSafePath(const std::string_view Path); -void EscapeBackslash(std::string& InOutString); +void EscapeBackslash(std::string& InOutString); class OptionValue { diff --git a/src/zenserver/frontend/frontend.cpp b/src/zenserver/frontend/frontend.cpp index 31d9e1c94..dfa710ae0 100644 --- a/src/zenserver/frontend/frontend.cpp +++ b/src/zenserver/frontend/frontend.cpp @@ -2,6 +2,7 @@ #include "frontend.h" +#include <zencore/compactbinarybuilder.h> #include <zencore/endian.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -26,7 +27,9 @@ static unsigned char gHtmlZipData[] = { namespace zen { //////////////////////////////////////////////////////////////////////////////// -HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory) +HttpFrontendService::HttpFrontendService(std::filesystem::path Directory, HttpStatusService& StatusService) +: m_Directory(Directory) +, m_StatusService(StatusService) { std::filesystem::path SelfPath = GetRunningExecutablePath(); @@ -50,7 +53,7 @@ HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Di { break; } - if (std::filesystem::is_regular_file(ParentPath / "xmake.lua", ErrorCode)) + if (IsFile(ParentPath / "xmake.lua", ErrorCode)) { if (ErrorCode) { @@ -59,7 +62,7 @@ HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Di std::filesystem::path HtmlDir = ParentPath / "src" / "zenserver" / "frontend" / "html"; - if (std::filesystem::is_directory(HtmlDir, ErrorCode)) + if (IsDir(HtmlDir, ErrorCode)) { m_Directory = HtmlDir; } @@ -81,10 +84,12 @@ HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Di { ZEN_INFO("front-end is NOT AVAILABLE"); } + m_StatusService.RegisterHandler("dashboard", *this); } HttpFrontendService::~HttpFrontendService() { + m_StatusService.UnregisterHandler("dashboard", *this); } const char* @@ -95,6 +100,14 @@ HttpFrontendService::BaseUri() const //////////////////////////////////////////////////////////////////////////////// void +HttpFrontendService::HandleStatusRequest(zen::HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request) { using namespace std::literals; diff --git a/src/zenserver/frontend/frontend.h b/src/zenserver/frontend/frontend.h index 6eac20620..84ffaac42 100644 --- a/src/zenserver/frontend/frontend.h +++ b/src/zenserver/frontend/frontend.h @@ -3,23 +3,26 @@ #pragma once #include <zenhttp/httpserver.h> +#include <zenhttp/httpstatus.h> #include "zipfs.h" #include <filesystem> namespace zen { -class HttpFrontendService final : public zen::HttpService +class HttpFrontendService final : public zen::HttpService, public IHttpStatusProvider { public: - HttpFrontendService(std::filesystem::path Directory); + HttpFrontendService(std::filesystem::path Directory, HttpStatusService& StatusService); virtual ~HttpFrontendService(); virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: ZipFs m_ZipFs; std::filesystem::path m_Directory; + HttpStatusService& m_StatusService; }; } // namespace zen diff --git a/src/zenserver/frontend/html.zip b/src/zenserver/frontend/html.zip Binary files differindex e0a1888e6..5778fa3d2 100644 --- a/src/zenserver/frontend/html.zip +++ b/src/zenserver/frontend/html.zip diff --git a/src/zenserver/frontend/html/indexer/cache.js b/src/zenserver/frontend/html/indexer/cache.js index 390aa948d..b90194855 100644 --- a/src/zenserver/frontend/html/indexer/cache.js +++ b/src/zenserver/frontend/html/indexer/cache.js @@ -9,7 +9,7 @@ export class Cache { this._db_name = db_name; this._store_names = store_names; - this._version = 1; + this._version = 2; this._db = this._open(); } diff --git a/src/zenserver/frontend/html/indexer/indexer.js b/src/zenserver/frontend/html/indexer/indexer.js index 4412e3a57..688bc71b0 100644 --- a/src/zenserver/frontend/html/indexer/indexer.js +++ b/src/zenserver/frontend/html/indexer/indexer.js @@ -43,9 +43,10 @@ class Indexer *search(needle) { + var needleLwr = needle.toLowerCase(); for (const page of this._pages) for (const [_, name] of page) - if (name.indexOf(needle) >= 0) + if (name.toLowerCase().indexOf(needleLwr) >= 0) yield name; } @@ -60,7 +61,7 @@ class Indexer { for (const page of this._pages) for (const [_, name, size, raw_size] of page) - yield [name, size|0, raw_size|0]; + yield [name, size|0n, raw_size|0n]; } } diff --git a/src/zenserver/frontend/html/indexer/worker.js b/src/zenserver/frontend/html/indexer/worker.js index 25c8d7671..69ee234fa 100644 --- a/src/zenserver/frontend/html/indexer/worker.js +++ b/src/zenserver/frontend/html/indexer/worker.js @@ -77,11 +77,11 @@ async function map_id_to_key(project_id, oplog, start, end, page_size, stride) continue; var id = 0n; - var size = 0; - var raw_size = 0; + var size = 0n; + var raw_size = 0n; for (const item of pkg_data.as_array()) { - var found = 0, pkg_id; + var found = 0, pkg_id = undefined; for (const field of item.as_object()) { if (!id && field.is_named("id")) pkg_id = field.as_value(); diff --git a/src/zenserver/frontend/html/pages/entry.js b/src/zenserver/frontend/html/pages/entry.js index 65a3ef39b..54fb11c18 100644 --- a/src/zenserver/frontend/html/pages/entry.js +++ b/src/zenserver/frontend/html/pages/entry.js @@ -59,6 +59,81 @@ export class Page extends ZenPage } } + _find_iohash_field(container, name) + { + const found_field = container.find(name); + if (found_field != undefined) + { + var found_value = found_field.as_value(); + if (found_value instanceof Uint8Array) + { + var ret = ""; + for (var x of found_value) + ret += x.toString(16).padStart(2, "0"); + return ret; + } + } + return null; + } + + async _build_meta(section, entry) + { + var tree = {} + + for (const field of entry) + { + var visibleKey = undefined; + const name = field.get_name(); + if (name == "CookPackageArtifacts") + { + visibleKey = name; + } + else if (name.startsWith("meta.")) + { + visibleKey = name.slice(5); + } + + if (visibleKey != undefined) + { + var found_value = field.as_value(); + if (found_value instanceof Uint8Array) + { + var ret = ""; + for (var x of found_value) + ret += x.toString(16).padStart(2, "0"); + tree[visibleKey] = ret; + } + } + + } + + if (Object.keys(tree).length == 0) + return; + + const sub_section = section.add_section("meta"); + + const table = sub_section.add_widget( + Table, + ["name", "actions"], Table.Flag_PackRight + ); + for (const key in tree) + { + const row = table.add_row(key); + const value = tree[key]; + + const project = this.get_param("project"); + const oplog = this.get_param("oplog"); + const link = row.get_cell(0).link( + "/" + ["prj", project, "oplog", oplog, value+".json"].join("/") + ); + + const action_tb = new Toolbar(row.get_cell(-1), true); + action_tb.left().add("copy-hash").on_click(async (v) => { + await navigator.clipboard.writeText(v); + }, value); + } + } + async _build_page() { var entry = await this._entry; @@ -78,8 +153,16 @@ export class Page extends ZenPage delete tree["$id"]; - const sub_section = section.add_section("deps"); - this._build_deps(sub_section, tree); + if (Object.keys(tree).length != 0) + { + const sub_section = section.add_section("deps"); + this._build_deps(sub_section, tree); + } + } + + // meta + { + this._build_meta(section, entry); } // data @@ -128,7 +211,6 @@ export class Page extends ZenPage ); link.first_child().attr("download", `${io_hash}_${base_name}`); - const do_nothing = () => void(0); const action_tb = new Toolbar(row.get_cell(-1), true); action_tb.left().add("copy-hash").on_click(async (v) => { await navigator.clipboard.writeText(v); @@ -147,8 +229,11 @@ export class Page extends ZenPage _display_unsupported(section, entry) { + const replacer = (key, value) => + typeof value === "bigint" ? { $bigint: value.toString() } : value; + const object = entry.to_js_object(); - const text = JSON.stringify(object, null, " "); + const text = JSON.stringify(object, replacer, " "); section.tag("pre").text(text); } diff --git a/src/zenserver/frontend/html/pages/oplog.js b/src/zenserver/frontend/html/pages/oplog.js index f22c2a58f..bef5bacce 100644 --- a/src/zenserver/frontend/html/pages/oplog.js +++ b/src/zenserver/frontend/html/pages/oplog.js @@ -142,9 +142,12 @@ export class Page extends ZenPage async _search(needle) { - needle = needle.trim(); - if (needle.length < 3) + if (needle.length == 0) + { + this._build_table(this._index_start); return; + } + needle = needle.trim(); this._entry_table.clear(this._index_start); diff --git a/src/zenserver/frontend/html/pages/start.js b/src/zenserver/frontend/html/pages/start.js index 8c9df62f9..d1c13ccc7 100644 --- a/src/zenserver/frontend/html/pages/start.js +++ b/src/zenserver/frontend/html/pages/start.js @@ -5,6 +5,7 @@ import { ZenPage } from "./page.js" import { Fetcher } from "../util/fetcher.js" import { Friendly } from "../util/friendly.js" +import { Modal } from "../util/modal.js" import { Table, Toolbar } from "../util/widgets.js" //////////////////////////////////////////////////////////////////////////////// @@ -40,6 +41,41 @@ export class Page extends ZenPage action_tb.left().add("drop").on_click((x) => this.drop_project(x), project.Id); } + // cache + var section = this.add_section("z$"); + columns = [ + "namespace", + "dir", + "buckets", + "entries", + "size disk", + "size mem", + "actions", + ] + var zcache_info = new Fetcher().resource("/z$/").json(); + const cache_table = section.add_widget(Table, columns, Table.Flag_FitLeft|Table.Flag_PackRight); + for (const namespace of (await zcache_info)["Namespaces"]) + { + new Fetcher().resource(`/z$/${namespace}/`).json().then((data) => { + const row = cache_table.add_row( + "", + data["Configuration"]["RootDir"], + data["Buckets"].length, + data["EntryCount"], + Friendly.kib(data["StorageSize"].DiskSize), + Friendly.kib(data["StorageSize"].MemorySize) + ); + var cell = row.get_cell(0); + cell.tag().text(namespace).on_click(() => this.view_zcache(namespace)); + row.get_cell(1).tag().text(namespace); + + cell = row.get_cell(-1); + const action_tb = new Toolbar(cell, true); + action_tb.left().add("view").on_click(() => this.view_zcache(namespace)); + action_tb.left().add("drop").on_click(() => this.drop_zcache(namespace)); + }); + } + // stats section = this.add_section("stats"); columns = [ @@ -91,4 +127,23 @@ export class Page extends ZenPage .option("Yes", () => drop()) .option("No"); } + + view_zcache(namespace) + { + window.location = "?page=zcache&namespace=" + namespace; + } + + drop_zcache(namespace) + { + const drop = async () => { + await new Fetcher().resource("z$", namespace).delete(); + this.reload(); + }; + + new Modal() + .title("Confirmation") + .message(`Drop zcache '${namespace}'?`) + .option("Yes", () => drop()) + .option("No"); + } } diff --git a/src/zenserver/frontend/html/pages/zcache.js b/src/zenserver/frontend/html/pages/zcache.js new file mode 100644 index 000000000..974893b21 --- /dev/null +++ b/src/zenserver/frontend/html/pages/zcache.js @@ -0,0 +1,70 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +"use strict"; + +import { ZenPage } from "./page.js" +import { Fetcher } from "../util/fetcher.js" +import { Friendly } from "../util/friendly.js" +import { Modal } from "../util/modal.js" +import { Table, PropTable, Toolbar } from "../util/widgets.js" + +//////////////////////////////////////////////////////////////////////////////// +export class Page extends ZenPage +{ + async main() + { + const namespace = this.get_param("namespace"); + + var info = new Fetcher().resource(`/z$/${namespace}/`).json(); + + this.set_title("cache - " + namespace); + + var section = this.add_section("info"); + var cfg_table = section.add_section("config").add_widget(PropTable); + var storage_table = section.add_section("storage").add_widget(PropTable); + + info = await info; + + cfg_table.add_object(info["Configuration"], true); + + storage_table.add_property("disk", Friendly.kib(info["StorageSize"]["DiskSize"])); + storage_table.add_property("mem", Friendly.kib(info["StorageSize"]["MemorySize"])); + storage_table.add_property("entries", Friendly.sep(info["EntryCount"])); + + var column_names = ["name", "disk", "mem", "entries", "actions"]; + var bucket_table = this.add_section("buckets").add_widget( + Table, + column_names, + Table.Flag_BiasLeft + ); + for (const bucket of info["Buckets"]) + { + const row = bucket_table.add_row(bucket); + new Fetcher().resource(`/z$/${namespace}/${bucket}`).json().then((data) => { + row.get_cell(1).text(Friendly.kib(data["StorageSize"]["DiskSize"])); + row.get_cell(2).text(Friendly.kib(data["StorageSize"]["MemorySize"])); + row.get_cell(3).text(Friendly.sep(data["DiskEntryCount"])); + + const cell = row.get_cell(-1); + const action_tb = new Toolbar(cell, true); + action_tb.left().add("view") + action_tb.left().add("drop").on_click(() => this.drop_bucket(bucket)); + }); + } + } + + drop_bucket(bucket) + { + const drop = async () => { + const namespace = this.get_param("namespace"); + await new Fetcher().resource("z$", namespace, bucket).delete(); + this.reload(); + }; + + new Modal() + .title("Confirmation") + .message(`Drop bucket '${bucket}'?`) + .option("Yes", () => drop()) + .option("No"); + } +} diff --git a/src/zenserver/frontend/html/util/compactbinary.js b/src/zenserver/frontend/html/util/compactbinary.js index 366ec6aff..90e4249f6 100644 --- a/src/zenserver/frontend/html/util/compactbinary.js +++ b/src/zenserver/frontend/html/util/compactbinary.js @@ -284,7 +284,7 @@ CbFieldView.prototype.as_array = function() } //////////////////////////////////////////////////////////////////////////////// -CbFieldView.prototype.as_value = function(int_type=Number) +CbFieldView.prototype.as_value = function(int_type=BigInt) { switch (CbFieldTypeOps.get_type(this.get_type())) { @@ -388,8 +388,8 @@ CbObjectView.prototype.to_js_object = function() } if (node.is_string()) return node.as_value(); - if (node.is_float()) return node.as_value(); if (node.is_integer()) return node.as_value(); + if (node.is_float()) return node.as_value(); var ret = node.as_value(); if (ret instanceof Uint8Array) diff --git a/src/zenserver/frontend/html/util/friendly.js b/src/zenserver/frontend/html/util/friendly.js index b27721964..a15252faf 100644 --- a/src/zenserver/frontend/html/util/friendly.js +++ b/src/zenserver/frontend/html/util/friendly.js @@ -7,17 +7,17 @@ export class Friendly { static sep(value, prec=0) { - return (+value).toLocaleString("en", { + return (+Number(value)).toLocaleString("en", { style: "decimal", minimumFractionDigits : prec, maximumFractionDigits : prec, }); } - static k(x, p=0) { return Friendly.sep((x + 999) / Math.pow(10, 3)|0, p) + "K"; } - static m(x, p=1) { return Friendly.sep( x / Math.pow(10, 6), p) + "M"; } - static g(x, p=2) { return Friendly.sep( x / Math.pow(10, 9), p) + "G"; } - static kib(x, p=0) { return Friendly.sep((x + 1023) / (1 << 10)|0, p) + " KiB"; } - static mib(x, p=1) { return Friendly.sep( x / (1 << 20), p) + " MiB"; } - static gib(x, p=2) { return Friendly.sep( x / (1 << 30), p) + " GiB"; } + static k(x, p=0) { return Friendly.sep((BigInt(x) + 999n) / BigInt(Math.pow(10, 3))|0n, p) + "K"; } + static m(x, p=1) { return Friendly.sep( BigInt(x) / BigInt(Math.pow(10, 6)), p) + "M"; } + static g(x, p=2) { return Friendly.sep( BigInt(x) / BigInt(Math.pow(10, 9)), p) + "G"; } + static kib(x, p=0) { return Friendly.sep((BigInt(x) + 1023n) / (1n << 10n)|0n, p) + " KiB"; } + static mib(x, p=1) { return Friendly.sep( BigInt(x) / (1n << 20n), p) + " MiB"; } + static gib(x, p=2) { return Friendly.sep( BigInt(x) / (1n << 30n), p) + " GiB"; } } diff --git a/src/zenserver/frontend/html/util/widgets.js b/src/zenserver/frontend/html/util/widgets.js index d4f9875cd..32a3f4d28 100644 --- a/src/zenserver/frontend/html/util/widgets.js +++ b/src/zenserver/frontend/html/util/widgets.js @@ -173,7 +173,7 @@ export class PropTable extends Table continue; } - if (friendly && typeof value == "number") + if (friendly && ((typeof value == "number") || (typeof value == "bigint"))) { if (key.indexOf("memory") >= 0) value = Friendly.kib(value); else if (key.indexOf("disk") >= 0) value = Friendly.kib(value); diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index d5419d342..b0d945814 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -10,6 +10,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> #include <zencore/session.h> #include <zencore/string.h> #include <zencore/thread.h> @@ -25,7 +26,6 @@ #include "config.h" #include "diag/logging.h" -#include "sentryintegration.h" #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> @@ -96,12 +96,18 @@ ZenEntryPoint::Run() #if ZEN_USE_SENTRY SentryIntegration Sentry; - if (m_ServerOptions.NoSentry == false) + if (m_ServerOptions.SentryConfig.Disable == false) { std::string SentryDatabasePath = (m_ServerOptions.DataDir / ".sentry-native").string(); std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string(); - Sentry.Initialize(SentryDatabasePath, SentryAttachmentPath, m_ServerOptions.SentryAllowPII, m_ServerOptions.CommandLine); + Sentry.Initialize({.DatabasePath = SentryDatabasePath, + .AttachmentsPath = SentryAttachmentPath, + .Dsn = m_ServerOptions.SentryConfig.Dsn, + .Environment = m_ServerOptions.SentryConfig.Environment, + .AllowPII = m_ServerOptions.SentryConfig.AllowPII, + .Debug = m_ServerOptions.SentryConfig.Debug}, + m_ServerOptions.CommandLine); } #endif try @@ -406,17 +412,17 @@ main(int argc, char* argv[]) if (!DeleteReason.empty()) { - if (std::filesystem::exists(ServerOptions.DataDir)) + if (IsDir(ServerOptions.DataDir)) { ZEN_CONSOLE_INFO("deleting files from '{}' ({})", ServerOptions.DataDir, DeleteReason); DeleteDirectories(ServerOptions.DataDir); } } - if (!std::filesystem::exists(ServerOptions.DataDir)) + if (!IsDir(ServerOptions.DataDir)) { ServerOptions.IsFirstRun = true; - std::filesystem::create_directories(ServerOptions.DataDir); + CreateDirectories(ServerOptions.DataDir); } if (!ServerOptions.BaseSnapshotDir.empty()) diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index b0212ab07..8faf12165 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -219,13 +219,17 @@ private: StringBuilderBase& Builder; }; -HttpObjectStoreService::HttpObjectStoreService(ObjectStoreConfig Cfg) : m_Cfg(std::move(Cfg)) +HttpObjectStoreService::HttpObjectStoreService(HttpStatusService& StatusService, ObjectStoreConfig Cfg) +: m_StatusService(StatusService) +, m_Cfg(std::move(Cfg)) { Inititalize(); + m_StatusService.RegisterHandler("obj", *this); } HttpObjectStoreService::~HttpObjectStoreService() { + m_StatusService.UnregisterHandler("obj", *this); } const char* @@ -245,13 +249,21 @@ HttpObjectStoreService::HandleRequest(zen::HttpServerRequest& Request) } void +HttpObjectStoreService::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void HttpObjectStoreService::Inititalize() { namespace fs = std::filesystem; ZEN_LOG_INFO(LogObj, "Initialzing Object Store in '{}'", m_Cfg.RootDirectory); const fs::path BucketsPath = m_Cfg.RootDirectory / "buckets"; - if (!fs::exists(BucketsPath)) + if (!IsDir(BucketsPath)) { CreateDirectories(BucketsPath); } @@ -269,9 +281,9 @@ HttpObjectStoreService::Inititalize() m_Router.RegisterRoute( "bucket/{path}", [this](zen::HttpRouterRequest& Request) { - const std::string Path = Request.GetCapture(1); - const auto Sep = Path.find_last_of('.'); - const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; + const std::string_view Path = Request.GetCapture(1); + const auto Sep = Path.find_last_of('.'); + const bool IsObject = Sep != std::string::npos && Path.size() - Sep > 0; if (IsObject) { @@ -324,7 +336,7 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request) const fs::path BucketPath = m_Cfg.RootDirectory / "buckets" / BucketName; { std::lock_guard _(BucketsMutex); - if (!fs::exists(BucketPath)) + if (!IsDir(BucketPath)) { CreateDirectories(BucketPath); ZEN_LOG_INFO(LogObj, "CREATE - new bucket '{}' OK", BucketName); @@ -337,18 +349,18 @@ HttpObjectStoreService::CreateBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; if (BucketName.empty()) { return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } - std::string BucketPrefix = Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; if (BucketPrefix.empty()) { const auto QueryParms = Request.ServerRequest().GetQueryParams(); @@ -376,7 +388,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Writer.BeginArray("Contents"sv); } - void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override + void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t, uint64_t) override { const fs::path FullPath = Parent / fs::path(File); fs::path RelativePath = fs::relative(FullPath, BucketPath); @@ -406,7 +418,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Visitor FileVisitor(BucketName, BucketRoot, RelativeBucketPath); FileSystemTraversal Traversal; - if (std::filesystem::exists(FullPath)) + if (IsDir(FullPath)) { std::lock_guard _(BucketsMutex); Traversal.TraverseFileSystem(FullPath, FileVisitor); @@ -450,14 +462,13 @@ HttpObjectStoreService::DeleteBucket(zen::HttpRouterRequest& Request) } void -HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string& Path) +HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::string_view Path) { namespace fs = std::filesystem; - const auto Sep = Path.find_first_of('/'); - const std::string BucketName = Sep == std::string::npos ? Path : Path.substr(0, Sep); - const std::string BucketPrefix = - Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1); + const auto Sep = Path.find_first_of('/'); + const std::string BucketName{Sep == std::string::npos ? Path : Path.substr(0, Sep)}; + const std::string BucketPrefix{Sep == std::string::npos || Sep == Path.size() - 1 ? std::string() : Path.substr(BucketName.size() + 1)}; const fs::path BucketDir = GetBucketDirectory(BucketName); @@ -476,7 +487,7 @@ HttpObjectStoreService::GetObject(zen::HttpRouterRequest& Request, const std::st } const fs::path FilePath = BucketDir / RelativeBucketPath; - if (!fs::exists(FilePath)) + if (!IsFile(FilePath)) { ZEN_LOG_DEBUG(LogObj, "GET - '{}/{}' [FAILED], doesn't exist", BucketName, FilePath); return Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound); @@ -554,8 +565,8 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request) { namespace fs = std::filesystem; - const std::string& BucketName = Request.GetCapture(1); - const fs::path BucketDir = GetBucketDirectory(BucketName); + const std::string_view BucketName = Request.GetCapture(1); + const fs::path BucketDir = GetBucketDirectory(BucketName); if (BucketDir.empty()) { @@ -577,7 +588,7 @@ HttpObjectStoreService::PutObject(zen::HttpRouterRequest& Request) { std::lock_guard _(BucketsMutex); - if (!fs::exists(FileDirectory)) + if (!IsDir(FileDirectory)) { CreateDirectories(FileDirectory); } diff --git a/src/zenserver/objectstore/objectstore.h b/src/zenserver/objectstore/objectstore.h index c905ceab3..44e50e208 100644 --- a/src/zenserver/objectstore/objectstore.h +++ b/src/zenserver/objectstore/objectstore.h @@ -3,6 +3,7 @@ #pragma once #include <zenhttp/httpserver.h> +#include <zenhttp/httpstatus.h> #include <atomic> #include <filesystem> #include <mutex> @@ -23,24 +24,26 @@ struct ObjectStoreConfig std::vector<BucketConfig> Buckets; }; -class HttpObjectStoreService final : public zen::HttpService +class HttpObjectStoreService final : public zen::HttpService, public IHttpStatusProvider { public: - HttpObjectStoreService(ObjectStoreConfig Cfg); + HttpObjectStoreService(HttpStatusService& StatusService, ObjectStoreConfig Cfg); virtual ~HttpObjectStoreService(); virtual const char* BaseUri() const override; virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: void Inititalize(); std::filesystem::path GetBucketDirectory(std::string_view BucketName); void CreateBucket(zen::HttpRouterRequest& Request); - void ListBucket(zen::HttpRouterRequest& Request, const std::string& Path); + void ListBucket(zen::HttpRouterRequest& Request, const std::string_view Path); void DeleteBucket(zen::HttpRouterRequest& Request); - void GetObject(zen::HttpRouterRequest& Request, const std::string& Path); + void GetObject(zen::HttpRouterRequest& Request, const std::string_view Path); void PutObject(zen::HttpRouterRequest& Request); + HttpStatusService& m_StatusService; ObjectStoreConfig m_Cfg; std::mutex BucketsMutex; HttpRequestRouter m_Router; diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index 302b81729..ab96ae92d 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -3,6 +3,7 @@ #include "buildsremoteprojectstore.h" #include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/fmtutils.h> @@ -34,16 +35,10 @@ public: , m_BuildId(BuildId) , m_MetaData(MetaData) , m_TempFilePath(TempFilePath) + , m_EnableBlocks(!ForceDisableBlocks) + , m_UseTempBlocks(!ForceDisableTempBlocks) { m_MetaData.MakeOwned(); - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceDisableTempBlocks) - { - m_UseTempBlocks = false; - } } virtual RemoteStoreInfo GetInfo() const override @@ -70,7 +65,7 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); IoBuffer Payload = m_MetaData; Payload.SetContentType(ZenContentType::kCbObject); @@ -94,7 +89,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); PutBuildPartResult PutResult = Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); AddStats(PutResult); @@ -114,24 +109,25 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, + const IoHash& RawHash, + ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult PutResult = - Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); + Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { - Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); return Result; @@ -139,57 +135,21 @@ public: if (Block.BlockHash == RawHash) { - ZEN_ASSERT(Block.ChunkLengths.size() == Block.ChunkHashes.size()); - CbObjectWriter Writer; - Writer.AddHash("rawHash"sv, RawHash); - Writer.BeginArray("rawHashes"sv); - { - for (const IoHash& ChunkHash : Block.ChunkHashes) - { - Writer.AddHash(ChunkHash); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkLengths"); - { - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(ChunkSize); - } - } - Writer.EndArray(); - Writer.BeginArray("chunkOffsets"); - { - ZEN_ASSERT(Block.FirstChunkOffset != (uint32_t)-1); - uint32_t Offset = Block.FirstChunkOffset; - for (uint32_t ChunkSize : Block.ChunkLengths) - { - Writer.AddInteger(Offset); - Offset += ChunkSize; - } - } - Writer.EndArray(); + CbObjectWriter BlockMetaData; + BlockMetaData.AddString("createdBy", GetRunningExecutablePath().stem().string()); - Writer.BeginObject("metadata"sv); - { - Writer.AddString("createdBy", "zenserver"); - } - Writer.EndObject(); - - IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); + IoBuffer MetaPayload = BuildChunkBlockDescription(Block, BlockMetaData.Save()).GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = - Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); + JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, RawHash, MetaPayload); AddStats(PutMetaResult); RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); if (MetaDataResult.ErrorCode) { - ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", + ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, MetaDataResult.Reason); } @@ -217,7 +177,7 @@ public: ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); FinalizeBuildPartResult FinalizeRefResult = Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); AddStats(FinalizeRefResult); @@ -256,7 +216,7 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(GetBuildResult); LoadContainerResult Result{ConvertResult(GetBuildResult)}; @@ -341,52 +301,48 @@ public: virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, Result.Reason); return Result; } - CbObject BlocksObject = LoadCompactBinaryObject(FindResult.Response); - if (!BlocksObject) + if (ValidateCompactBinary(FindResult.Response.GetView(), CbValidateMode::Default) != CbValidateError::None) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a compact binary object"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, - m_BuildId, - m_OplogBuildPartId); + m_BuildId); return Result; } - - CbArrayView Blocks = BlocksObject["blocks"].AsArrayView(); - Result.Blocks.reserve(Blocks.Num()); - for (CbFieldView BlockView : Blocks) + std::optional<std::vector<ChunkBlockDescription>> Blocks = + ParseChunkBlockDescriptionList(LoadCompactBinaryObject(FindResult.Response)); + if (!Blocks) { - CbObjectView BlockObject = BlockView.AsObjectView(); - IoHash BlockHash = BlockObject["rawHash"sv].AsHash(); - if (BlockHash != IoHash::Zero) - { - CbArrayView ChunksArray = BlockObject["rawHashes"sv].AsArrayView(); - std::vector<IoHash> ChunkHashes; - ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkView : ChunksArray) - { - ChunkHashes.push_back(ChunkView.AsHash()); - } - Result.Blocks.emplace_back(Block{.BlockHash = BlockHash, .ChunkHashes = ChunkHashes}); - } + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The block list {}/{}/{} is not formatted as a list of blocks"sv, + m_JupiterClient->ServiceUrl(), + m_Namespace, + m_Bucket, + m_BuildId); + return Result; + } + Result.Blocks.reserve(Blocks.value().size()); + for (ChunkBlockDescription& BlockDescription : Blocks.value()) + { + Result.Blocks.push_back(ThinChunkBlockDescription{.BlockHash = BlockDescription.BlockHash, + .ChunkRawHashes = std::move(BlockDescription.ChunkRawHashes)}); } return Result; } @@ -394,19 +350,18 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}/{}. Reason: '{}'", m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, - m_OplogBuildPartId, RawHash, Result.Reason); } @@ -491,8 +446,9 @@ private: IoBuffer m_MetaData; Oid m_OplogBuildPartId = Oid::Zero; std::filesystem::path m_TempFilePath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = true; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; @@ -533,7 +489,15 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } - else + else if (!Options.OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h index 8b2c6c8c8..c52b13886 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.h +++ b/src/zenserver/projectstore/buildsremoteprojectstore.h @@ -10,17 +10,18 @@ class AuthMgr; struct BuildsRemoteStoreOptions : RemoteStoreOptions { - std::string Url; - std::string Namespace; - std::string Bucket; - Oid BuildId; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; - IoBuffer MetaData; + std::string Url; + std::string Namespace; + std::string Bucket; + Oid BuildId; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + std::filesystem::path OidcExePath; + bool ForceDisableBlocks = false; + bool ForceDisableTempBlocks = false; + bool AssumeHttp2 = false; + IoBuffer MetaData; }; std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 0fe739a12..375e44e59 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -73,7 +73,7 @@ public: ContainerObject.IterateAttachments([&](CbFieldView FieldView) { IoHash AttachmentHash = FieldView.AsBinaryAttachment(); std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); - if (!std::filesystem::exists(AttachmentPath)) + if (!IsFile(AttachmentPath)) { Result.Needs.insert(AttachmentHash); } @@ -106,12 +106,12 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { Stopwatch Timer; SaveAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::exists(ChunkPath)) + if (!IsFile(ChunkPath)) { try { @@ -182,7 +182,7 @@ public: for (const IoHash& RawHash : BlockHashes) { std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (std::filesystem::is_regular_file(ChunkPath)) + if (IsFile(ChunkPath)) { ExistingBlockHashes.push_back(RawHash); } @@ -192,7 +192,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; Result.Blocks = std::move(KnownBlocks); return Result; @@ -203,7 +203,7 @@ public: Stopwatch Timer; LoadAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::is_regular_file(ChunkPath)) + if (!IsFile(ChunkPath)) { Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); @@ -246,7 +246,7 @@ private: LoadContainerResult Result; std::filesystem::path SourcePath = m_OutputPath; SourcePath.append(Name); - if (!std::filesystem::is_regular_file(SourcePath)) + if (!IsFile(SourcePath)) { Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 0b8e5f13b..317a419eb 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -234,10 +235,15 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) +HttpProjectService::HttpProjectService(CidStore& Store, + ProjectStore* Projects, + HttpStatusService& StatusService, + HttpStatsService& StatsService, + AuthMgr& AuthMgr) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) +, m_StatusService(StatusService) , m_StatsService(StatsService) , m_AuthMgr(AuthMgr) { @@ -245,8 +251,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, using namespace std::literals; - m_StatsService.RegisterHandler("prj", *this); - m_Router.AddPattern("project", "([[:alnum:]_.]+)"); m_Router.AddPattern("log", "([[:alnum:]_.]+)"); m_Router.AddPattern("op", "([[:digit:]]+?)"); @@ -365,11 +369,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, "details\\$/{project}/{log}/{chunk}", [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); }, HttpVerb::kGet); + + m_StatusService.RegisterHandler("prj", *this); + m_StatsService.RegisterHandler("prj", *this); } HttpProjectService::~HttpProjectService() { m_StatsService.UnregisterHandler("prj", *this); + m_StatusService.UnregisterHandler("prj", *this); } const char* @@ -465,6 +473,15 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) } void +HttpProjectService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpProjectService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ProjectList"); @@ -885,10 +902,63 @@ HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) case HttpVerb::kGet: { IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value, nullptr); + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, Value, nullptr); if (Result.first == HttpResponseCode::OK) { + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary || + AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); + IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer(); + + if (DecompressedBuffer) + { + if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default); + if (!!CbErr) + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG( + "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not convert to object", + ToString(AcceptType))); + } + + m_ProjectStats.ChunkHitCount++; + CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); + } + else + { + Value = DecompressedBuffer; + Value.SetContentType(ZenContentType::kBinary); + } + } + else + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not decompress stored data", + ToString(AcceptType))); + } + } m_ProjectStats.ChunkHitCount++; return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); } @@ -983,15 +1053,19 @@ HttpProjectService::HandleOplogOpPrepRequest(HttpRouterRequest& Req) IoBuffer Payload = HttpReq.ReadPayload(); CbObject RequestObject = LoadCompactBinaryObject(Payload); - std::vector<IoHash> ChunkList; - CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); - ChunkList.reserve(HaveList.Num()); - for (auto& Entry : HaveList) + std::vector<IoHash> NeedList; + { - ChunkList.push_back(Entry.AsHash()); - } + eastl::fixed_vector<IoHash, 16> ChunkList; + CbArrayView HaveList = RequestObject["have"sv].AsArrayView(); + ChunkList.reserve(HaveList.Num()); + for (auto& Entry : HaveList) + { + ChunkList.push_back(Entry.AsHash()); + } - std::vector<IoHash> NeedList = FoundLog->CheckPendingChunkReferences(ChunkList, std::chrono::minutes(2)); + NeedList = FoundLog->CheckPendingChunkReferences(std::span(begin(ChunkList), end(ChunkList)), std::chrono::minutes(2)); + } CbObjectWriter Cbo(1 + 1 + 5 + NeedList.size() * (1 + sizeof(IoHash::Hash)) + 1); Cbo.BeginArray("need"); @@ -1151,7 +1225,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } - std::vector<IoHash> ReferencedChunks; + eastl::fixed_vector<IoHash, 16> ReferencedChunks; Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); // Write core to oplog @@ -1169,7 +1243,7 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) // Once we stored the op, we no longer need to retain any chunks this op references if (!ReferencedChunks.empty()) { - FoundLog->RemovePendingChunkReferences(ReferencedChunks); + FoundLog->RemovePendingChunkReferences(std::span(begin(ReferencedChunks), end(ReferencedChunks))); } m_ProjectStats.OpWriteCount++; @@ -1301,9 +1375,9 @@ HttpProjectService::HandleOpLogOpRequest(HttpRouterRequest& Req) HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string& ProjectId = Req.GetCapture(1); - const std::string& OplogId = Req.GetCapture(2); - const std::string& OpIdString = Req.GetCapture(3); + const std::string_view ProjectId = Req.GetCapture(1); + const std::string_view OplogId = Req.GetCapture(2); + const std::string_view OpIdString = Req.GetCapture(3); Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); if (!Project) @@ -1690,8 +1764,8 @@ HttpProjectService::HandleProjectRequest(HttpRouterRequest& Req) using namespace std::literals; - HttpServerRequest& HttpReq = Req.ServerRequest(); - const std::string ProjectId = Req.GetCapture(1); + HttpServerRequest& HttpReq = Req.ServerRequest(); + const std::string_view ProjectId = Req.GetCapture(1); switch (HttpReq.RequestVerb()) { diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 8e74c57a5..295defa5c 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -5,6 +5,7 @@ #include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> +#include <zenhttp/httpstatus.h> #include <zenstore/cidstore.h> namespace zen { @@ -31,16 +32,21 @@ class ProjectStore; // refs: // -class HttpProjectService : public HttpService, public IHttpStatsProvider +class HttpProjectService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider { public: - HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, HttpStatsService& StatsService, AuthMgr& AuthMgr); + HttpProjectService(CidStore& Store, + ProjectStore* InProjectStore, + HttpStatusService& StatusService, + HttpStatsService& StatsService, + AuthMgr& AuthMgr); ~HttpProjectService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: struct ProjectStats @@ -89,6 +95,7 @@ private: CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; + HttpStatusService& m_StatusService; HttpStatsService& m_StatsService; AuthMgr& m_AuthMgr; ProjectStats m_ProjectStats; diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e906127ff..3728babb5 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -31,15 +31,9 @@ public: , m_Key(Key) , m_OptionalBaseKey(OptionalBaseKey) , m_TempFilePath(TempFilePath) + , m_EnableBlocks(!ForceDisableBlocks) + , m_UseTempBlocks(!ForceDisableTempBlocks) { - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceDisableTempBlocks) - { - m_UseTempBlocks = false; - } } virtual RemoteStoreInfo GetInfo() const override @@ -75,7 +69,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); AddStats(PutResult); @@ -92,9 +86,9 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); AddStats(PutResult); @@ -127,7 +121,7 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); AddStats(FinalizeRefResult); @@ -164,7 +158,7 @@ public: {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; } - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterExistsResult ExistsResult = Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); AddStats(ExistsResult); @@ -193,7 +187,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds}}; } - std::vector<RemoteProjectStore::Block> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); + std::vector<ThinChunkBlockDescription> KnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); GetKnownBlocksResult Result{ {.ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000.0}}; @@ -203,7 +197,7 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); AddStats(GetResult); @@ -239,7 +233,7 @@ public: private: LoadContainerResult LoadContainer(const IoHash& Key) { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); AddStats(GetResult); if (GetResult.ErrorCode || !GetResult.Success) @@ -329,8 +323,9 @@ private: const IoHash m_Key; const IoHash m_OptionalBaseKey; std::filesystem::path m_TempFilePath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = true; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; @@ -371,7 +366,15 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } - else + else if (!Options.OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h index 27f3d9b73..8bf79d563 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -10,17 +10,18 @@ class AuthMgr; struct JupiterRemoteStoreOptions : RemoteStoreOptions { - std::string Url; - std::string Namespace; - std::string Bucket; - IoHash Key; - IoHash OptionalBaseKey; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + IoHash OptionalBaseKey; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + std::filesystem::path OidcExePath; + bool ForceDisableBlocks = false; + bool ForceDisableTempBlocks = false; + bool AssumeHttp2 = false; }; std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 46a236af9..53e687983 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -22,6 +22,7 @@ #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/openprocesscache.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -58,7 +59,7 @@ namespace { std::filesystem::path DroppedBucketPath; do { - if (!std::filesystem::exists(Dir)) + if (!IsDir(Dir)) { return true; } @@ -68,7 +69,7 @@ namespace { std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId); DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { if (!DeleteDirectories(DroppedBucketPath)) { @@ -77,7 +78,7 @@ namespace { Dir); continue; } - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir); continue; @@ -88,13 +89,13 @@ namespace { do { std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); + RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { OutDeleteDir = DroppedBucketPath; return true; } - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.", Dir, @@ -210,6 +211,16 @@ namespace { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (!IsFile(OidcExePathMaybe)) + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + OidcExePath = std::move(OidcExePathMaybe); + } + } std::string_view KeyParam = Cloud["key"sv].AsString(); if (KeyParam.empty()) { @@ -252,6 +263,7 @@ namespace { std::string(OpenIdProvider), AccessToken, AuthManager, + OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2}; @@ -307,6 +319,16 @@ namespace { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (!IsFile(OidcExePathMaybe)) + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + OidcExePath = std::move(OidcExePathMaybe); + } + } std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); if (BuildIdParam.empty()) { @@ -337,6 +359,7 @@ namespace { std::string(OpenIdProvider), AccessToken, AuthManager, + OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2, @@ -423,9 +446,13 @@ ComputeOpKey(const CbObjectView& Op) { using namespace std::literals; - BinaryWriter KeyStream; + eastl::fixed_vector<uint8_t, 256> KeyData; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyStream.Write(Data, Size); }); + Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { + auto Begin = reinterpret_cast<const uint8_t*>(Data); + auto End = Begin + Size; + KeyData.insert(KeyData.end(), Begin, End); + }); XXH3_128 KeyHash128; @@ -434,15 +461,15 @@ ComputeOpKey(const CbObjectView& Op) // path but longer paths are evaluated properly. In the future all key lengths // should be evaluated using the proper path, this is a temporary workaround to // maintain compatibility with existing disk state. - if (KeyStream.GetSize() < 240) + if (KeyData.size() < 240) { XXH3_128Stream_deprecated KeyHasher; - KeyHasher.Append(KeyStream.Data(), KeyStream.Size()); + KeyHasher.Append(KeyData.data(), KeyData.size()); KeyHash128 = KeyHasher.GetHash(); } else { - KeyHash128 = XXH3_128::HashMemory(KeyStream.GetView()); + KeyHash128 = XXH3_128::HashMemory(KeyData.data(), KeyData.size()); } Oid KeyHash; @@ -482,7 +509,7 @@ struct ProjectStore::OplogStorage : public RefCounted [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) { - return std::filesystem::exists(GetLogPath(BasePath)) && std::filesystem::exists(GetBlobsPath(BasePath)); + return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath)); } [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); } [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath) @@ -492,13 +519,13 @@ struct ProjectStore::OplogStorage : public RefCounted void WipeState() const { std::error_code Ec; - std::filesystem::remove(GetLogPath(), Ec); - std::filesystem::remove(GetBlobsPath(), Ec); + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); } static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } - uint64_t OpBlobsSize() const { return std::filesystem::file_size(GetBlobsPath()); } + uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); } uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); } static uint64_t OpsSize(const std::filesystem::path& BasePath) @@ -506,7 +533,7 @@ struct ProjectStore::OplogStorage : public RefCounted if (Exists(BasePath)) { std::error_code DummyEc; - return std::filesystem::file_size(GetLogPath(BasePath)) + std::filesystem::file_size(GetBlobsPath(BasePath)); + return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath)); } return 0; } @@ -685,7 +712,7 @@ struct ProjectStore::OplogStorage : public RefCounted m_OpBlobs.Close(); Oplog.Close(); - std::filesystem::rename(OplogPath, GetLogPath(), Ec); + RenameFile(OplogPath, GetLogPath(), Ec); if (Ec) { throw std::system_error( @@ -698,9 +725,9 @@ struct ProjectStore::OplogStorage : public RefCounted if (Ec) { // We failed late - clean everything up as best we can - std::filesystem::remove(OpBlobs.GetPath(), Ec); - std::filesystem::remove(GetLogPath(), Ec); - std::filesystem::remove(GetBlobsPath(), Ec); + RemoveFile(OpBlobs.GetPath(), Ec); + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); throw std::system_error(Ec, fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'", OpBlobs.GetPath(), @@ -735,7 +762,7 @@ struct ProjectStore::OplogStorage : public RefCounted } catch (const std::exception& /*Ex*/) { - std::filesystem::remove(OpBlobs.GetPath(), Ec); + RemoveFile(OpBlobs.GetPath(), Ec); throw; } } @@ -983,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted .OpCoreHash = OpData.OpCoreHash, .OpKeyHash = OpData.KeyHash}; - m_Oplog.Append(Entry); m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); + m_Oplog.Append(Entry); return Entry; } @@ -1104,7 +1131,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath); m_Storage->WipeState(); std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } } m_Storage->Open(/* IsCreate */ !StoreExists); @@ -1112,7 +1139,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, m_MetaPath = m_BasePath / "ops.meta"sv; m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); - CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); } ProjectStore::Oplog::~Oplog() @@ -1138,7 +1165,7 @@ ProjectStore::Oplog::Flush() if (!m_MetaValid) { std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } uint64_t LogCount = m_Storage->LogCount(); @@ -1234,19 +1261,19 @@ ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) uint64_t Size = OplogStorage::OpsSize(BasePath); std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (std::filesystem::exists(StateFilePath)) + if (IsFile(StateFilePath)) { - Size += std::filesystem::file_size(StateFilePath); + Size += FileSizeFromPath(StateFilePath); } std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv; - if (std::filesystem::exists(MetaFilePath)) + if (IsFile(MetaFilePath)) { - Size += std::filesystem::file_size(MetaFilePath); + Size += FileSizeFromPath(MetaFilePath); } std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv; - if (std::filesystem::exists(IndexFilePath)) + if (IsFile(IndexFilePath)) { - Size += std::filesystem::file_size(IndexFilePath); + Size += FileSizeFromPath(IndexFilePath); } return Size; @@ -1299,7 +1326,7 @@ ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - return std::filesystem::is_regular_file(StateFilePath); + return IsFile(StateFilePath); } bool @@ -1333,7 +1360,7 @@ ProjectStore::Oplog::Read() if (!m_MetaValid) { std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } ReadIndexSnapshot(); @@ -1434,7 +1461,7 @@ ProjectStore::Oplog::Reset() m_Storage = new OplogStorage(this, m_BasePath); m_Storage->Open(true); m_MetaValid = false; - CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); Write(); } // Erase content on disk @@ -1453,7 +1480,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (std::filesystem::is_regular_file(StateFilePath)) + if (IsFile(StateFilePath)) { // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath); @@ -1484,11 +1511,17 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo ValidationResult Result; + const size_t OpCount = OplogCount(); + std::vector<Oid> KeyHashes; std::vector<std::string> Keys; std::vector<std::vector<IoHash>> Attachments; std::vector<OplogEntryMapping> Mappings; + KeyHashes.reserve(OpCount); + Keys.reserve(OpCount); + Mappings.reserve(OpCount); + IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) { Result.LSNLow = Min(Result.LSNLow, LSN); Result.LSNHigh = Max(Result.LSNHigh, LSN); @@ -1513,77 +1546,90 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Chunk.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } + for (const ChunkMapping& Meta : Mapping.Meta) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Meta.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } + for (const FileMapping& File : Mapping.Files) { if (File.Hash == IoHash::Zero) { std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath; - if (!std::filesystem::is_regular_file(FilePath)) + if (!IsFile(FilePath)) { ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } - else + else if (!m_CidStore.ContainsChunk(File.Hash)) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); - HasMissingEntries = true; - } + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); + HasMissingEntries = true; } } + const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload) + if (!m_CidStore.ContainsChunk(Attachment)) { ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } + if (HasMissingEntries) { ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); } }; - Latch WorkLatch(1); - - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - if (OptionalWorkerPool) - { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { - ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); - ValidateOne(Index); - }); - } - else + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { - ValidateOne(OpIndex); + if (AbortFlag) + { + break; + } + if (OptionalWorkerPool) + { + Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (AbortFlag) + { + return; + } + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); + } } } - - WorkLatch.CountDown(); - WorkLatch.Wait(); + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); + } + Work.Wait(); { // Check if we were deleted while we were checking the references without a lock... @@ -1617,31 +1663,9 @@ ProjectStore::Oplog::WriteIndexSnapshot() namespace fs = std::filesystem; - fs::path IndexPath = m_BasePath / "ops.zidx"; - fs::path TempIndexPath = m_BasePath / "ops.zidx.tmp"; - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to clean up temp snapshot at {}, reason: '{}'", - GetOuterProject()->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - return; - } - } - + const fs::path IndexPath = m_BasePath / "ops.zidx"; try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, TempIndexPath); - } - // Write the current state of the location map to a new index state std::vector<uint32_t> LSNEntries; std::vector<Oid> Keys; @@ -1722,36 +1746,28 @@ ProjectStore::Oplog::WriteIndexSnapshot() uint64_t Offset = 0; IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); - Offset += sizeof(OplogIndexHeader); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); - Offset += LSNEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset += Keys.size() * sizeof(Oid); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); - Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); - Offset += LatestOpMapEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset += ChunkMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset += MetaMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset += FilePathLengths.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); for (const auto& FilePath : FilePaths) { @@ -1763,7 +1779,11 @@ ProjectStore::Oplog::WriteIndexSnapshot() ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); + throw std::system_error(Ec, + fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", + ObjectIndexFile.GetPath(), + IndexPath, + Ec.message())); } EntryCount = LSNEntries.size(); m_LogFlushPosition = IndexLogPosition; @@ -1771,35 +1791,6 @@ ProjectStore::Oplog::WriteIndexSnapshot() catch (const std::exception& Err) { ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProject->Identifier, m_OplogId, Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(TempIndexPath, IndexPath, Ec); - if (Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to restore old snapshot from {}, reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - } - } - } - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to remove temporary file {}, reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - } } } @@ -1809,8 +1800,8 @@ ProjectStore::Oplog::ReadIndexSnapshot() ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot"); - std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; - if (std::filesystem::is_regular_file(IndexPath)) + const std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; + if (IsFile(IndexPath)) { uint64_t EntryCount = 0; Stopwatch Timer; @@ -2135,66 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; - Latch WorkLatch(1); - - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - if (Result.load() == false) + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { - break; - } - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork( - [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (Result.load() == false) - { - return; - } - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - try - { - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (!Payload) + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[ChunkIndex], FilePath); + return; } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + try + { + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (!Payload) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + } - if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + if (!AsyncCallback(FileChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) { - Result.store(false); + ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", + m_OuterProject->Identifier, + m_OplogId, + FileChunkIndex, + FilePath, + Ex.what()); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - FileChunkIndex, - FilePath, - Ex.what()); - } - }); - } + }); + } - if (!CidChunkHashes.empty()) + if (!CidChunkHashes.empty() && !AbortFlag) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + if (AbortFlag) + { + return false; + } + return AsyncCallback(CidChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + } + catch (const std::exception& Ex) { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); + AbortFlag.store(true); + ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); - return Result.load(); + return !AbortFlag; } else { @@ -2735,7 +2741,7 @@ ProjectStore::Oplog::CheckPendingChunkReferences(std::span<const IoHash> ChunkHa MissingChunks.reserve(ChunkHashes.size()); for (const IoHash& FileHash : ChunkHashes) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(FileHash); !Payload) + if (!m_CidStore.ContainsChunk(FileHash)) { MissingChunks.push_back(FileHash); } @@ -3129,7 +3135,7 @@ ProjectStore::Project::~Project() bool ProjectStore::Project::Exists(const std::filesystem::path& BasePath) { - return std::filesystem::exists(BasePath / "Project.zcb"); + return IsFile(BasePath / "Project.zcb"); } void @@ -3203,7 +3209,7 @@ ProjectStore::Project::ReadAccessTimes() using namespace std::literals; std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; - if (!std::filesystem::exists(ProjectAccessTimesFilePath)) + if (!IsFile(ProjectAccessTimesFilePath)) { return; } @@ -3359,7 +3365,6 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Store::OpenOplog"); - std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); { RwLock::SharedLockScope ProjectLock(m_ProjectLock); @@ -3367,21 +3372,35 @@ ProjectStore::Project::OpenOplog(std::string_view OplogId, bool AllowCompact, bo if (OplogIt != m_Oplogs.end()) { - if (!VerifyPathOnDisk || Oplog::ExistsAt(OplogBasePath)) + bool ReOpen = false; + + if (VerifyPathOnDisk) { - return OplogIt->second.get(); + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + + if (!Oplog::ExistsAt(OplogBasePath)) + { + // Somebody deleted the oplog on disk behind our back + ProjectLock.ReleaseNow(); + std::filesystem::path DeletePath; + if (!RemoveOplog(OplogId, DeletePath)) + { + ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + } + + ReOpen = true; + } } - // Somebody deleted the oplog on disk behind our back - ProjectLock.ReleaseNow(); - std::filesystem::path DeletePath; - if (!RemoveOplog(OplogId, DeletePath)) + if (!ReOpen) { - ZEN_WARN("Failed to clean up deleted oplog {}/{}", Identifier, OplogId, OplogBasePath); + return OplogIt->second.get(); } } } + std::filesystem::path OplogBasePath = BasePathForOplog(OplogId); + RwLock::ExclusiveLockScope Lock(m_ProjectLock); if (auto It = m_Oplogs.find(std::string{OplogId}); It != m_Oplogs.end()) { @@ -3581,14 +3600,14 @@ ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath) uint64_t Size = 0; std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv; - if (std::filesystem::exists(AccessTimesFilePath)) + if (IsFile(AccessTimesFilePath)) { - Size += std::filesystem::file_size(AccessTimesFilePath); + Size += FileSizeFromPath(AccessTimesFilePath); } std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv; - if (std::filesystem::exists(ProjectFilePath)) + if (IsFile(ProjectFilePath)) { - Size += std::filesystem::file_size(ProjectFilePath); + Size += FileSizeFromPath(ProjectFilePath); } return Size; @@ -3700,7 +3719,7 @@ ProjectStore::Project::IsExpired(const std::string& EntryName, if (!MarkerPath.empty()) { std::error_code Ec; - if (std::filesystem::exists(MarkerPath, Ec)) + if (IsFile(MarkerPath, Ec)) { if (Ec) { @@ -3853,7 +3872,7 @@ void ProjectStore::DiscoverProjects() { ZEN_MEMSCOPE(GetProjectstoreTag()); - if (!std::filesystem::exists(m_ProjectBasePath)) + if (!IsDir(m_ProjectBasePath)) { return; } @@ -3902,26 +3921,32 @@ ProjectStore::Flush() } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); - - for (const Ref<Project>& Project : Projects) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([this, &WorkLatch, Project]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + for (const Ref<Project>& Project : Projects) + { + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } void @@ -3962,7 +3987,7 @@ ProjectStore::StorageSize() const GcStorageSize Result; { - if (std::filesystem::exists(m_ProjectBasePath)) + if (IsDir(m_ProjectBasePath)) { DirectoryContent ProjectsFolderContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); @@ -3970,7 +3995,7 @@ ProjectStore::StorageSize() const for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) { std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; - if (std::filesystem::exists(ProjectStateFilePath)) + if (IsFile(ProjectStateFilePath)) { Result.DiskSize += Project::TotalSize(ProjectBasePath); DirectoryContent DirContent; @@ -4753,7 +4778,6 @@ std::pair<HttpResponseCode, std::string> ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, - ZenContentType AcceptType, IoBuffer& OutChunk, uint64_t* OptionalInOutModificationTag) { @@ -4795,16 +4819,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, } } - if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); - OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(ZenContentType::kBinary); - } - else - { - OutChunk.SetContentType(ZenContentType::kCompressedBinary); - } + OutChunk.SetContentType(ZenContentType::kCompressedBinary); return {HttpResponseCode::OK, {}}; } @@ -5347,7 +5362,7 @@ ProjectStore::ReadOplog(const std::string_view ProjectId, /* BuildBlocks */ false, /* IgnoreMissingAttachments */ false, /* AllowChunking*/ false, - [](CompressedBuffer&&, RemoteProjectStore::Block&&) {}, + [](CompressedBuffer&&, ChunkBlockDescription&&) {}, [](const IoHash&, TGetAttachmentBufferFunc&&) {}, [](std::vector<std::pair<IoHash, FetchChunkFunc>>&&) {}, /* EmbedLooseFiles*/ false); @@ -7236,7 +7251,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project1FilePath); + RemoveFile(Project1FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), @@ -7265,7 +7280,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project2Oplog1Path); + RemoveFile(Project2Oplog1Path); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), @@ -7294,7 +7309,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project2FilePath); + RemoveFile(Project2FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), @@ -8018,7 +8033,7 @@ TEST_CASE("project.store.rpc.getchunks") CompositeBuffer Buffer = Attachment->AsCompositeBinary(); CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), IoHash::HashBuffer(Buffer)); - CHECK_EQ(Chunk["Size"sv].AsUInt64(), std::filesystem::file_size(FilesOpIdAttachments[0].second)); + CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second)); CHECK(!Chunk.FindView("RawSize")); } { @@ -8499,12 +8514,7 @@ TEST_CASE("project.store.partial.read") uint64_t ModificationTag = 0; IoBuffer Chunk; CHECK(ProjectStore - .GetChunk("proj1"sv, - "oplog1"sv, - Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), - HttpContentType::kCompressedBinary, - Chunk, - &ModificationTag) + .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), Chunk, &ModificationTag) .first == HttpResponseCode::OK); IoHash RawHash; uint64_t RawSize; @@ -8513,12 +8523,7 @@ TEST_CASE("project.store.partial.read") CHECK(ModificationTag != 0); CHECK(ProjectStore - .GetChunk("proj1"sv, - "oplog1"sv, - Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), - HttpContentType::kCompressedBinary, - Chunk, - &ModificationTag) + .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), Chunk, &ModificationTag) .first == HttpResponseCode::NotModified); } @@ -8621,14 +8626,18 @@ TEST_CASE("project.store.block") Chunks.reserve(AttachmentSizes.size()); for (const auto& It : AttachmentsWithId) { - Chunks.push_back(std::make_pair(It.second.DecodeRawHash(), - [Buffer = It.second.GetCompressed().Flatten().AsIoBuffer()](const IoHash&) -> CompositeBuffer { - return CompositeBuffer(SharedBuffer(Buffer)); - })); - } - RemoteProjectStore::Block Block; - CompressedBuffer BlockBuffer = GenerateBlock(std::move(Chunks), Block); - CHECK(IterateBlock(BlockBuffer.Decompress(), [](CompressedBuffer&&, const IoHash&) {})); + Chunks.push_back( + std::make_pair(It.second.DecodeRawHash(), [Buffer = It.second](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Buffer.DecodeRawSize(), Buffer}; + })); + } + ChunkBlockDescription Block; + CompressedBuffer BlockBuffer = GenerateChunkBlock(std::move(Chunks), Block); + uint64_t HeaderSize; + CHECK(IterateChunkBlock( + BlockBuffer.Decompress(), + [](CompressedBuffer&&, const IoHash&) {}, + HeaderSize)); } TEST_CASE("project.store.iterateoplog") diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 8f2d3ce0d..368da5ea4 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -449,7 +449,6 @@ public: std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, - ZenContentType AcceptType, IoBuffer& OutChunk, uint64_t* OptionalInOutModificationTag); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 0589fdc5f..f96b3e185 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -12,8 +12,8 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> -#include <zenstore/chunkedfile.h> #include <zenstore/cidstore.h> +#include <zenutil/chunkedfile.h> #include <zenutil/workerpools.h> #include <unordered_map> @@ -143,7 +143,7 @@ namespace remotestore_impl { NiceBytes(Stats.m_PeakReceivedBytes)); } - size_t AddBlock(RwLock& BlocksLock, std::vector<RemoteProjectStore::Block>& Blocks) + size_t AddBlock(RwLock& BlocksLock, std::vector<ChunkBlockDescription>& Blocks) { size_t BlockIndex; { @@ -154,63 +154,6 @@ namespace remotestore_impl { return BlockIndex; } - IoBuffer WriteToTempFile(CompressedBuffer&& CompressedBuffer, std::filesystem::path Path) - { - if (std::filesystem::is_regular_file(Path)) - { - IoBuffer ExistingTempFile = IoBuffer(IoBufferBuilder::MakeFromFile(Path)); - if (ExistingTempFile && ExistingTempFile.GetSize() == CompressedBuffer.GetCompressedSize()) - { - ExistingTempFile.SetDeleteOnClose(true); - return ExistingTempFile; - } - } - IoBuffer BlockBuffer; - BasicFile BlockFile; - uint32_t RetriesLeft = 3; - BlockFile.Open(Path, BasicFile::Mode::kTruncateDelete, [&](std::error_code& Ec) { - if (RetriesLeft == 0) - { - return false; - } - ZEN_WARN("Failed to create temporary oplog block '{}': '{}', retries left: {}.", Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms - RetriesLeft--; - return true; - }); - uint64_t Offset = 0; - { - CompositeBuffer Compressed = std::move(CompressedBuffer).GetCompressed(); - for (const SharedBuffer& Segment : Compressed.GetSegments()) - { - size_t SegmentSize = Segment.GetSize(); - static const uint64_t BufferingSize = 256u * 1024u; - - IoBufferFileReference FileRef; - if (SegmentSize >= (BufferingSize + BufferingSize / 2) && Segment.GetFileReference(FileRef)) - { - ScanFile(FileRef.FileHandle, - FileRef.FileChunkOffset, - FileRef.FileChunkSize, - BufferingSize, - [&BlockFile, &Offset](const void* Data, size_t Size) { - BlockFile.Write(Data, Size, Offset); - Offset += Size; - }); - } - else - { - BlockFile.Write(Segment.GetData(), SegmentSize, Offset); - Offset += SegmentSize; - } - } - } - void* FileHandle = BlockFile.Detach(); - BlockBuffer = IoBuffer(IoBuffer::File, FileHandle, 0, Offset, /*IsWholeFile*/ true); - BlockBuffer.SetDeleteOnClose(true); - return BlockBuffer; - } - RemoteProjectStore::Result WriteOplogSection(ProjectStore::Oplog& Oplog, const CbObjectView& SectionObject, JobContext* OptionalContext) { using namespace std::literals; @@ -573,21 +516,23 @@ namespace remotestore_impl { return; } - bool StoreChunksOK = IterateBlock( - BlockPayload, - [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, - const IoHash& AttachmentRawHash) { - if (WantedChunks.contains(AttachmentRawHash)) - { - WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); - IoHash RawHash; - uint64_t RawSize; - ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); - ZEN_ASSERT(RawHash == AttachmentRawHash); - WriteRawHashes.emplace_back(AttachmentRawHash); - WantedChunks.erase(AttachmentRawHash); - } - }); + uint64_t BlockHeaderSize = 0; + bool StoreChunksOK = IterateChunkBlock( + BlockPayload, + [&WantedChunks, &WriteAttachmentBuffers, &WriteRawHashes, &Info](CompressedBuffer&& Chunk, + const IoHash& AttachmentRawHash) { + if (WantedChunks.contains(AttachmentRawHash)) + { + WriteAttachmentBuffers.emplace_back(Chunk.GetCompressed().Flatten().AsIoBuffer()); + IoHash RawHash; + uint64_t RawSize; + ZEN_ASSERT(CompressedBuffer::ValidateCompressedHeader(WriteAttachmentBuffers.back(), RawHash, RawSize)); + ZEN_ASSERT(RawHash == AttachmentRawHash); + WriteRawHashes.emplace_back(AttachmentRawHash); + WantedChunks.erase(AttachmentRawHash); + } + }, + BlockHeaderSize); if (!StoreChunksOK) { @@ -738,14 +683,14 @@ namespace remotestore_impl { }); }; - void CreateBlock(WorkerThreadPool& WorkerPool, - Latch& OpSectionsLatch, - std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, - RwLock& SectionsLock, - std::vector<RemoteProjectStore::Block>& Blocks, - size_t BlockIndex, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, - AsyncRemoteResult& RemoteResult) + void CreateBlock(WorkerThreadPool& WorkerPool, + Latch& OpSectionsLatch, + std::vector<std::pair<IoHash, FetchChunkFunc>>&& ChunksInBlock, + RwLock& SectionsLock, + std::vector<ChunkBlockDescription>& Blocks, + size_t BlockIndex, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, + AsyncRemoteResult& RemoteResult) { OpSectionsLatch.AddCount(1); WorkerPool.ScheduleWork([&Blocks, @@ -764,10 +709,10 @@ namespace remotestore_impl { try { ZEN_ASSERT(ChunkCount > 0); - Stopwatch Timer; - RemoteProjectStore::Block Block; - CompressedBuffer CompressedBlock = GenerateBlock(std::move(Chunks), Block); - IoHash BlockHash = CompressedBlock.DecodeRawHash(); + Stopwatch Timer; + ChunkBlockDescription Block; + CompressedBuffer CompressedBlock = GenerateChunkBlock(std::move(Chunks), Block); + IoHash BlockHash = CompressedBlock.DecodeRawHash(); { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope __(SectionsLock); @@ -800,8 +745,8 @@ namespace remotestore_impl { struct CreatedBlock { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; }; void UploadAttachments(WorkerThreadPool& WorkerPool, @@ -931,8 +876,8 @@ namespace remotestore_impl { } try { - IoBuffer Payload; - RemoteProjectStore::Block Block; + IoBuffer Payload; + ChunkBlockDescription Block; if (auto BlockIt = CreatedBlocks.find(RawHash); BlockIt != CreatedBlocks.end()) { Payload = BlockIt->second.Payload; @@ -1058,7 +1003,7 @@ namespace remotestore_impl { { auto It = BulkBlockAttachmentsToUpload.find(Chunk); ZEN_ASSERT(It != BulkBlockAttachmentsToUpload.end()); - CompositeBuffer ChunkPayload = It->second(It->first); + CompressedBuffer ChunkPayload = It->second(It->first).second; if (!ChunkPayload) { RemoteResult.SetError(static_cast<int32_t>(HttpResponseCode::NotFound), @@ -1067,8 +1012,8 @@ namespace remotestore_impl { ChunkBuffers.clear(); break; } - ChunksSize += ChunkPayload.GetSize(); - ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).Flatten().AsIoBuffer())); + ChunksSize += ChunkPayload.GetCompressedSize(); + ChunkBuffers.emplace_back(SharedBuffer(std::move(ChunkPayload).GetCompressed().Flatten().AsIoBuffer())); } RemoteProjectStore::SaveAttachmentsResult Result = RemoteStore.SaveAttachments(ChunkBuffers); if (Result.ErrorCode) @@ -1139,54 +1084,13 @@ namespace remotestore_impl { } } // namespace remotestore_impl -bool -IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor) -{ - ZEN_ASSERT(BlockPayload); - if (BlockPayload.GetSize() < 1) - { - return false; - } - - MemoryView BlockView = BlockPayload.GetView(); - const uint8_t* ReadPtr = reinterpret_cast<const uint8_t*>(BlockView.GetData()); - uint32_t NumberSize; - uint64_t ChunkCount = ReadVarUInt(ReadPtr, NumberSize); - ReadPtr += NumberSize; - std::vector<uint64_t> ChunkSizes; - ChunkSizes.reserve(ChunkCount); - while (ChunkCount--) - { - ChunkSizes.push_back(ReadVarUInt(ReadPtr, NumberSize)); - ReadPtr += NumberSize; - } - ptrdiff_t TempBufferLength = std::distance(reinterpret_cast<const uint8_t*>(BlockView.GetData()), ReadPtr); - ZEN_ASSERT(TempBufferLength > 0); - for (uint64_t ChunkSize : ChunkSizes) - { - IoBuffer Chunk(IoBuffer::Wrap, ReadPtr, ChunkSize); - IoHash AttachmentRawHash; - uint64_t AttachmentRawSize; - CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), AttachmentRawHash, AttachmentRawSize); - - if (!CompressedChunk) - { - ZEN_ERROR("Invalid chunk in block"); - return false; - } - Visitor(std::move(CompressedChunk), AttachmentRawHash); - ReadPtr += ChunkSize; - ZEN_ASSERT(ReadPtr <= BlockView.GetDataEnd()); - } - return true; -}; std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; - CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); + std::vector<ChunkBlockDescription> Result; + CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); std::vector<IoHash> BlockHashes; BlockHashes.reserve(BlocksArray.Num()); @@ -1199,11 +1103,11 @@ GetBlockHashesFromOplog(CbObjectView ContainerObject) return BlockHashes; } -std::vector<RemoteProjectStore::Block> +std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes) { using namespace std::literals; - std::vector<RemoteProjectStore::Block> Result; + std::vector<ThinChunkBlockDescription> Result; CbArrayView BlocksArray = ContainerObject["blocks"sv].AsArrayView(); tsl::robin_set<IoHash, IoHash::Hasher> IncludeSet; IncludeSet.insert(IncludeBlockHashes.begin(), IncludeBlockHashes.end()); @@ -1226,53 +1130,12 @@ GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> Include { ChunkHashes.push_back(ChunkField.AsHash()); } - Result.push_back({.BlockHash = BlockHash, .ChunkHashes = std::move(ChunkHashes)}); + Result.push_back(ThinChunkBlockDescription{.BlockHash = BlockHash, .ChunkRawHashes = std::move(ChunkHashes)}); } } return Result; } -CompressedBuffer -GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock) -{ - const size_t ChunkCount = FetchChunks.size(); - - std::vector<SharedBuffer> ChunkSegments; - ChunkSegments.resize(1); - ChunkSegments.reserve(1 + ChunkCount); - OutBlock.ChunkHashes.reserve(ChunkCount); - OutBlock.ChunkLengths.reserve(ChunkCount); - { - IoBuffer TempBuffer(ChunkCount * 9); - MutableMemoryView View = TempBuffer.GetMutableView(); - uint8_t* BufferStartPtr = reinterpret_cast<uint8_t*>(View.GetData()); - uint8_t* BufferEndPtr = BufferStartPtr; - BufferEndPtr += WriteVarUInt(gsl::narrow<uint64_t>(ChunkCount), BufferEndPtr); - for (const auto& It : FetchChunks) - { - CompositeBuffer Chunk = It.second(It.first); - uint64_t ChunkSize = 0; - std::span<const SharedBuffer> Segments = Chunk.GetSegments(); - for (const SharedBuffer& Segment : Segments) - { - ChunkSize += Segment.GetSize(); - ChunkSegments.push_back(Segment); - } - BufferEndPtr += WriteVarUInt(ChunkSize, BufferEndPtr); - OutBlock.ChunkHashes.push_back(It.first); - OutBlock.ChunkLengths.push_back(gsl::narrow<uint32_t>(ChunkSize)); - } - ZEN_ASSERT(BufferEndPtr <= View.GetDataEnd()); - ptrdiff_t TempBufferLength = std::distance(BufferStartPtr, BufferEndPtr); - ChunkSegments[0] = SharedBuffer(IoBuffer(TempBuffer, 0, gsl::narrow<size_t>(TempBufferLength))); - } - CompressedBuffer CompressedBlock = - CompressedBuffer::Compress(CompositeBuffer(std::move(ChunkSegments)), OodleCompressor::Mermaid, OodleCompressionLevel::None); - OutBlock.BlockHash = CompressedBlock.DecodeRawHash(); - OutBlock.FirstChunkOffset = gsl::narrow<uint32_t>(CompressedBuffer::GetHeaderSizeForNoneEncoder() + ChunkSegments[0].GetSize()); - return CompressedBlock; -} - CbObject BuildContainer(CidStore& ChunkStore, ProjectStore::Project& Project, @@ -1283,9 +1146,9 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::vector<RemoteProjectStore::Block>& KnownBlocks, + const std::vector<ThinChunkBlockDescription>& KnownBlocks, WorkerThreadPool& WorkerPool, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles, @@ -1307,9 +1170,9 @@ BuildContainer(CidStore& ChunkStore, std::unordered_map<IoHash, FoundAttachment, IoHash::Hasher> UploadAttachments; - RwLock BlocksLock; - std::vector<RemoteProjectStore::Block> Blocks; - CompressedBuffer OpsBuffer; + RwLock BlocksLock; + std::vector<ChunkBlockDescription> Blocks; + CompressedBuffer OpsBuffer; std::filesystem::path AttachmentTempPath = Oplog.TempPath(); AttachmentTempPath.append(".pending"); @@ -1349,7 +1212,7 @@ BuildContainer(CidStore& ChunkStore, { std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project.RootDir / ServerPath; - if (!std::filesystem::is_regular_file(FilePath)) + if (!IsFile(FilePath)) { remotestore_impl::ReportMessage( OptionalContext, @@ -1525,7 +1388,7 @@ BuildContainer(CidStore& ChunkStore, return {}; } - auto FindReuseBlocks = [](const std::vector<RemoteProjectStore::Block>& KnownBlocks, + auto FindReuseBlocks = [](const std::vector<ThinChunkBlockDescription>& KnownBlocks, const std::unordered_set<IoHash, IoHash::Hasher>& Attachments, JobContext* OptionalContext) -> std::vector<size_t> { std::vector<size_t> ReuseBlockIndexes; @@ -1538,14 +1401,14 @@ BuildContainer(CidStore& ChunkStore, for (size_t KnownBlockIndex = 0; KnownBlockIndex < KnownBlocks.size(); KnownBlockIndex++) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - size_t BlockAttachmentCount = KnownBlock.ChunkHashes.size(); + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + size_t BlockAttachmentCount = KnownBlock.ChunkRawHashes.size(); if (BlockAttachmentCount == 0) { continue; } size_t FoundAttachmentCount = 0; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (Attachments.contains(KnownHash)) { @@ -1586,8 +1449,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockIndexes = FindReuseBlocks(KnownBlocks, FoundHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (UploadAttachments.erase(KnownHash) == 1) { @@ -1605,10 +1468,7 @@ BuildContainer(CidStore& ChunkStore, }; std::vector<ChunkedFile> ChunkedFiles; - auto ChunkFile = [AttachmentTempPath](const IoHash& RawHash, - IoBuffer& RawData, - const IoBufferFileReference& FileRef, - JobContext*) -> ChunkedFile { + auto ChunkFile = [](const IoHash& RawHash, IoBuffer& RawData, const IoBufferFileReference& FileRef, JobContext*) -> ChunkedFile { ChunkedFile Chunked; Stopwatch Timer; @@ -1632,12 +1492,12 @@ BuildContainer(CidStore& ChunkStore, return Chunked; }; - RwLock ResolveLock; - std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; - std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; - std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; - std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> LooseUploadAttachments; - std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; + RwLock ResolveLock; + std::unordered_set<IoHash, IoHash::Hasher> ChunkedHashes; + std::unordered_set<IoHash, IoHash::Hasher> LargeChunkHashes; + std::unordered_map<IoHash, size_t, IoHash::Hasher> ChunkedUploadAttachments; + std::unordered_map<IoHash, std::pair<uint64_t, IoBuffer>, IoHash::Hasher> LooseUploadAttachments; + std::unordered_set<IoHash, IoHash::Hasher> MissingHashes; remotestore_impl::ReportMessage(OptionalContext, fmt::format("Resolving {} attachments from {} ops", UploadAttachments.size(), TotalOpCount)); @@ -1717,9 +1577,7 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - - IoBuffer TempAttachmentBuffer = - remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), @@ -1730,7 +1588,7 @@ BuildContainer(CidStore& ChunkStore, } else { - size_t RawSize = RawData.GetSize(); + uint64_t RawSize = RawData.GetSize(); CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer(RawData), OodleCompressor::Mermaid, OodleCompressionLevel::VeryFast); @@ -1738,23 +1596,24 @@ BuildContainer(CidStore& ChunkStore, std::filesystem::path AttachmentPath = AttachmentTempPath; AttachmentPath.append(RawHash.ToHexString()); - IoBuffer TempAttachmentBuffer = remotestore_impl::WriteToTempFile(std::move(Compressed), AttachmentPath); + uint64_t CompressedSize = Compressed.GetCompressedSize(); + IoBuffer TempAttachmentBuffer = WriteToTempFile(std::move(Compressed).GetCompressed(), AttachmentPath); ZEN_INFO("Saved temp attachment to '{}', {} ({})", AttachmentPath, NiceBytes(RawSize), NiceBytes(TempAttachmentBuffer.GetSize())); - if (Compressed.GetCompressedSize() > MaxChunkEmbedSize) + if (CompressedSize > MaxChunkEmbedSize) { OnLargeAttachment(RawHash, [Data = std::move(TempAttachmentBuffer)](const IoHash&) { return Data; }); ResolveLock.WithExclusiveLock([RawHash, &LargeChunkHashes]() { LargeChunkHashes.insert(RawHash); }); } else { - UploadAttachment->Size = Compressed.GetCompressedSize(); + UploadAttachment->Size = CompressedSize; ResolveLock.WithExclusiveLock( - [RawHash, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { - LooseUploadAttachments.insert_or_assign(RawHash, std::move(Data)); + [RawHash, RawSize, &LooseUploadAttachments, Data = std::move(TempAttachmentBuffer)]() { + LooseUploadAttachments.insert_or_assign(RawHash, std::make_pair(RawSize, std::move(Data))); }); } } @@ -1927,8 +1786,8 @@ BuildContainer(CidStore& ChunkStore, std::vector<size_t> ReusedBlockFromChunking = FindReuseBlocks(KnownBlocks, ChunkedHashes, OptionalContext); for (size_t KnownBlockIndex : ReusedBlockIndexes) { - const RemoteProjectStore::Block& KnownBlock = KnownBlocks[KnownBlockIndex]; - for (const IoHash& KnownHash : KnownBlock.ChunkHashes) + const ThinChunkBlockDescription& KnownBlock = KnownBlocks[KnownBlockIndex]; + for (const IoHash& KnownHash : KnownBlock.ChunkRawHashes) { if (ChunkedHashes.erase(KnownHash) == 1) { @@ -1946,7 +1805,7 @@ BuildContainer(CidStore& ChunkStore, Blocks.reserve(ReuseBlockCount); for (auto It = ReusedBlockIndexes.begin(); It != UniqueKnownBlocksEnd; It++) { - Blocks.push_back(KnownBlocks[*It]); + Blocks.push_back({KnownBlocks[*It]}); } remotestore_impl::ReportMessage(OptionalContext, fmt::format("Reused {} attachments from {} blocks", ReusedAttachmentCount, ReuseBlockCount)); @@ -2062,9 +1921,9 @@ BuildContainer(CidStore& ChunkStore, { // We can share the lock as we are not resizing the vector and only touch BlockHash at our own index RwLock::SharedLockScope _(BlocksLock); - Blocks[BlockIndex].ChunkHashes.insert(Blocks[BlockIndex].ChunkHashes.end(), - BlockAttachmentHashes.begin(), - BlockAttachmentHashes.end()); + Blocks[BlockIndex].ChunkRawHashes.insert(Blocks[BlockIndex].ChunkRawHashes.end(), + BlockAttachmentHashes.begin(), + BlockAttachmentHashes.end()); } uint64_t NowMS = Timer.GetElapsedTimeMs(); ZEN_INFO("Assembled block {} with {} chunks in {} ({})", @@ -2109,16 +1968,25 @@ BuildContainer(CidStore& ChunkStore, { if (auto It = LooseUploadAttachments.find(RawHash); It != LooseUploadAttachments.end()) { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [IoBuffer = SharedBuffer(It->second)](const IoHash&) { - return CompositeBuffer(IoBuffer); - })); + ChunksInBlock.emplace_back(std::make_pair( + RawHash, + [RawSize = It->second.first, + IoBuffer = SharedBuffer(It->second.second)](const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return std::make_pair(RawSize, CompressedBuffer::FromCompressedNoValidate(IoBuffer.AsIoBuffer())); + })); LooseUploadAttachments.erase(It); } else { - ChunksInBlock.emplace_back(std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) { - return CompositeBuffer(SharedBuffer(ChunkStore.FindChunkByCid(RawHash))); - })); + ChunksInBlock.emplace_back( + std::make_pair(RawHash, [&ChunkStore](const IoHash& RawHash) -> std::pair<uint64_t, CompressedBuffer> { + IoBuffer Chunk = ChunkStore.FindChunkByCid(RawHash); + IoHash _; + uint64_t RawSize = 0; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), _, RawSize); + ZEN_ASSERT(Compressed); + return {RawSize, Compressed}; + })); } BlockSize += PayloadSize; @@ -2169,14 +2037,15 @@ BuildContainer(CidStore& ChunkStore, if (BlockAttachmentHashes.insert(ChunkHash).second) { const ChunkSource& Source = Chunked.ChunkSources[ChunkIndex]; - ChunksInBlock.emplace_back(std::make_pair( - ChunkHash, - [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size](const IoHash&) { - return CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), - OodleCompressor::Mermaid, - OodleCompressionLevel::None) - .GetCompressed(); - })); + ChunksInBlock.emplace_back( + std::make_pair(ChunkHash, + [Source = ChunkedFile.Source, Offset = Source.Offset, Size = Source.Size]( + const IoHash&) -> std::pair<uint64_t, CompressedBuffer> { + return {Size, + CompressedBuffer::Compress(SharedBuffer(IoBuffer(Source, Offset, Size)), + OodleCompressor::Mermaid, + OodleCompressionLevel::None)}; + })); BlockSize += CompressedBuffer::GetHeaderSizeForNoneEncoder() + Source.Size; if (BuildBlocks) { @@ -2298,9 +2167,9 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinary("ops"sv, CompressedOpsSection.GetCompressed().Flatten().AsIoBuffer()); OplogContinerWriter.BeginArray("blocks"sv); { - for (const RemoteProjectStore::Block& B : Blocks) + for (const ChunkBlockDescription& B : Blocks) { - ZEN_ASSERT(!B.ChunkHashes.empty()); + ZEN_ASSERT(!B.ChunkRawHashes.empty()); if (BuildBlocks) { ZEN_ASSERT(B.BlockHash != IoHash::Zero); @@ -2310,7 +2179,7 @@ BuildContainer(CidStore& ChunkStore, OplogContinerWriter.AddBinaryAttachment("rawhash"sv, B.BlockHash); OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunkHashes) + for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContinerWriter.AddHash(RawHash); } @@ -2326,7 +2195,7 @@ BuildContainer(CidStore& ChunkStore, { OplogContinerWriter.BeginArray("chunks"sv); { - for (const IoHash& RawHash : B.ChunkHashes) + for (const IoHash& RawHash : B.ChunkRawHashes) { OplogContinerWriter.AddBinaryAttachment(RawHash); } @@ -2392,7 +2261,7 @@ BuildContainer(CidStore& ChunkStore, bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles) @@ -2458,13 +2327,13 @@ SaveOplog(CidStore& ChunkStore, std::unordered_map<IoHash, remotestore_impl::CreatedBlock, IoHash::Hasher> CreatedBlocks; tsl::robin_map<IoHash, TGetAttachmentBufferFunc, IoHash::Hasher> LooseLargeFiles; - auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto MakeTempBlock = [AttachmentTempPath, &RemoteResult, &AttachmentsLock, &CreatedBlocks](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { std::filesystem::path BlockPath = AttachmentTempPath; BlockPath.append(Block.BlockHash.ToHexString()); try { - IoBuffer BlockBuffer = remotestore_impl::WriteToTempFile(std::move(CompressedBlock), BlockPath); + IoBuffer BlockBuffer = WriteToTempFile(std::move(CompressedBlock).GetCompressed(), BlockPath); RwLock::ExclusiveLockScope __(AttachmentsLock); CreatedBlocks.insert({Block.BlockHash, {.Payload = std::move(BlockBuffer), .Block = std::move(Block)}}); ZEN_DEBUG("Saved temp block to '{}', {}", AttachmentTempPath, NiceBytes(BlockBuffer.GetSize())); @@ -2478,8 +2347,8 @@ SaveOplog(CidStore& ChunkStore, } }; - auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, - RemoteProjectStore::Block&& Block) { + auto UploadBlock = [&RemoteStore, &RemoteResult, &Info, OptionalContext](CompressedBuffer&& CompressedBlock, + ChunkBlockDescription&& Block) { IoHash BlockHash = Block.BlockHash; RemoteProjectStore::SaveAttachmentResult Result = RemoteStore.SaveAttachment(CompressedBlock.GetCompressed(), BlockHash, std::move(Block)); @@ -2512,7 +2381,7 @@ SaveOplog(CidStore& ChunkStore, ZEN_DEBUG("Found attachment {}", AttachmentHash); }; - std::function<void(CompressedBuffer&&, RemoteProjectStore::Block &&)> OnBlock; + std::function<void(CompressedBuffer&&, ChunkBlockDescription &&)> OnBlock; if (RemoteStoreInfo.UseTempBlockFiles) { OnBlock = MakeTempBlock; @@ -2522,7 +2391,7 @@ SaveOplog(CidStore& ChunkStore, OnBlock = UploadBlock; } - std::vector<RemoteProjectStore::Block> KnownBlocks; + std::vector<ThinChunkBlockDescription> KnownBlocks; uint64_t TransferWallTimeMS = 0; @@ -3214,9 +3083,9 @@ LoadOplog(CidStore& ChunkStore, OptionalContext]() { auto _ = MakeGuard([&DechunkLatch, &TempFileName] { std::error_code Ec; - if (std::filesystem::exists(TempFileName, Ec)) + if (IsFile(TempFileName, Ec)) { - std::filesystem::remove(TempFileName, Ec); + RemoveFile(TempFileName, Ec); if (Ec) { ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); diff --git a/src/zenserver/projectstore/remoteprojectstore.h b/src/zenserver/projectstore/remoteprojectstore.h index e05cb9923..1210afc7c 100644 --- a/src/zenserver/projectstore/remoteprojectstore.h +++ b/src/zenserver/projectstore/remoteprojectstore.h @@ -5,6 +5,8 @@ #include <zencore/jobqueue.h> #include "projectstore.h" +#include <zenutil/chunkblock.h> + #include <unordered_set> namespace zen { @@ -16,14 +18,6 @@ struct ChunkedInfo; class RemoteProjectStore { public: - struct Block - { - IoHash BlockHash; - std::vector<IoHash> ChunkHashes; - std::vector<uint32_t> ChunkLengths; - uint32_t FirstChunkOffset = (uint32_t)-1; - }; - struct Result { int32_t ErrorCode{}; @@ -72,7 +66,7 @@ public: struct GetKnownBlocksResult : public Result { - std::vector<Block> Blocks; + std::vector<ThinChunkBlockDescription> Blocks; }; struct RemoteStoreInfo @@ -101,11 +95,11 @@ public: virtual RemoteStoreInfo GetInfo() const = 0; virtual Stats GetStats() const = 0; - virtual CreateContainerResult CreateContainer() = 0; - virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) = 0; - virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; - virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; + virtual CreateContainerResult CreateContainer() = 0; + virtual SaveResult SaveContainer(const IoBuffer& Payload) = 0; + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& Block) = 0; + virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; + virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; virtual LoadContainerResult LoadContainer() = 0; virtual GetKnownBlocksResult GetKnownBlocks() = 0; @@ -125,7 +119,6 @@ struct RemoteStoreOptions }; typedef std::function<IoBuffer(const IoHash& AttachmentHash)> TGetAttachmentBufferFunc; -typedef std::function<CompositeBuffer(const IoHash& RawHash)> FetchChunkFunc; RemoteProjectStore::LoadContainerResult BuildContainer( CidStore& ChunkStore, @@ -137,7 +130,7 @@ RemoteProjectStore::LoadContainerResult BuildContainer( bool BuildBlocks, bool IgnoreMissingAttachments, bool AllowChunking, - const std::function<void(CompressedBuffer&&, RemoteProjectStore::Block&&)>& AsyncOnBlock, + const std::function<void(CompressedBuffer&&, ChunkBlockDescription&&)>& AsyncOnBlock, const std::function<void(const IoHash&, TGetAttachmentBufferFunc&&)>& OnLargeAttachment, const std::function<void(std::vector<std::pair<IoHash, FetchChunkFunc>>&&)>& OnBlockChunks, bool EmbedLooseFiles); @@ -173,9 +166,7 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, bool CleanOplog, JobContext* OptionalContext); -CompressedBuffer GenerateBlock(std::vector<std::pair<IoHash, FetchChunkFunc>>&& FetchChunks, RemoteProjectStore::Block& OutBlock); -bool IterateBlock(const SharedBuffer& BlockPayload, std::function<void(CompressedBuffer&& Chunk, const IoHash& AttachmentHash)> Visitor); std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); -std::vector<RemoteProjectStore::Block> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); +std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); } // namespace zen diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 42519b108..2ebf58a5d 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -93,7 +93,7 @@ public: return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { std::string SaveRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); HttpClient::Response Response = m_Client.Post(SaveRequest, Payload, ZenContentType::kCompressedBinary); diff --git a/src/zenserver/sentryintegration.cpp b/src/zenserver/sentryintegration.cpp deleted file mode 100644 index 7996f25bb..000000000 --- a/src/zenserver/sentryintegration.cpp +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "sentryintegration.h" - -#include <zencore/config.h> -#include <zencore/logging.h> -#include <zencore/session.h> -#include <zencore/uid.h> - -#include <stdarg.h> -#include <stdio.h> - -#if ZEN_PLATFORM_LINUX -# include <pwd.h> -#endif - -#if ZEN_PLATFORM_MAC -# include <pwd.h> -#endif - -ZEN_THIRD_PARTY_INCLUDES_START -#include <spdlog/spdlog.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_USE_SENTRY -# define SENTRY_BUILD_STATIC 1 -ZEN_THIRD_PARTY_INCLUDES_START -# include <sentry.h> -# include <spdlog/sinks/base_sink.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace sentry { - -struct SentryAssertImpl : zen::AssertImpl -{ - virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION - OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg, zen::CallstackFrames* Callstack) override; -}; - -class sentry_sink final : public spdlog::sinks::base_sink<spdlog::details::null_mutex> -{ -public: - sentry_sink(); - ~sentry_sink(); - -protected: - void sink_it_(const spdlog::details::log_msg& msg) override; - void flush_() override; -}; - -////////////////////////////////////////////////////////////////////////// - -static constexpr sentry_level_t MapToSentryLevel[spdlog::level::level_enum::n_levels] = {SENTRY_LEVEL_DEBUG, - SENTRY_LEVEL_DEBUG, - SENTRY_LEVEL_INFO, - SENTRY_LEVEL_WARNING, - SENTRY_LEVEL_ERROR, - SENTRY_LEVEL_FATAL, - SENTRY_LEVEL_DEBUG}; - -sentry_sink::sentry_sink() -{ -} -sentry_sink::~sentry_sink() -{ -} - -void -sentry_sink::sink_it_(const spdlog::details::log_msg& msg) -{ - if (msg.level != spdlog::level::err && msg.level != spdlog::level::critical) - { - return; - } - try - { - std::string Message = fmt::format("{}\n{}({}) [{}]", msg.payload, msg.source.filename, msg.source.line, msg.source.funcname); - sentry_value_t event = sentry_value_new_message_event( - /* level */ MapToSentryLevel[msg.level], - /* logger */ nullptr, - /* message */ Message.c_str()); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - catch (const std::exception&) - { - // If our logging with Message formatting fails we do a non-allocating version and just post the msg.payload raw - char TmpBuffer[256]; - size_t MaxCopy = zen::Min<size_t>(msg.payload.size(), size_t(255)); - memcpy(TmpBuffer, msg.payload.data(), MaxCopy); - TmpBuffer[MaxCopy] = '\0'; - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ TmpBuffer); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } -} -void -sentry_sink::flush_() -{ -} - -void -SentryAssertImpl::OnAssert(const char* Filename, int LineNumber, const char* FunctionName, const char* Msg, zen::CallstackFrames* Callstack) -{ - // Sentry will provide its own callstack - ZEN_UNUSED(Callstack); - try - { - std::string Message = fmt::format("ASSERT {}:({}) [{}]\n\"{}\"", Filename, LineNumber, FunctionName, Msg); - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ Message.c_str()); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } - catch (const std::exception&) - { - // If our logging with Message formatting fails we do a non-allocating version and just post the Msg raw - sentry_value_t event = sentry_value_new_message_event( - /* level */ SENTRY_LEVEL_ERROR, - /* logger */ nullptr, - /* message */ Msg); - sentry_event_value_add_stacktrace(event, NULL, 0); - sentry_capture_event(event); - } -} - -} // namespace sentry - -namespace zen { - -# if ZEN_USE_SENTRY -static void -SentryLogFunction(sentry_level_t Level, const char* Message, va_list Args, [[maybe_unused]] void* Userdata) -{ - char LogMessageBuffer[160]; - std::string LogMessage; - const char* MessagePtr = LogMessageBuffer; - - int n = vsnprintf(LogMessageBuffer, sizeof LogMessageBuffer, Message, Args); - - if (n >= int(sizeof LogMessageBuffer)) - { - LogMessage.resize(n + 1); - - n = vsnprintf(LogMessage.data(), LogMessage.size(), Message, Args); - - MessagePtr = LogMessage.c_str(); - } - - switch (Level) - { - case SENTRY_LEVEL_DEBUG: - ZEN_CONSOLE_DEBUG("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_INFO: - ZEN_CONSOLE_INFO("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_WARNING: - ZEN_CONSOLE_WARN("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_ERROR: - ZEN_CONSOLE_ERROR("sentry: {}", MessagePtr); - break; - - case SENTRY_LEVEL_FATAL: - ZEN_CONSOLE_CRITICAL("sentry: {}", MessagePtr); - break; - } -} -# endif - -SentryIntegration::SentryIntegration() -{ -} - -SentryIntegration::~SentryIntegration() -{ - if (m_IsInitialized && m_SentryErrorCode == 0) - { - logging::SetErrorLog(""); - m_SentryAssert.reset(); - sentry_close(); - } -} - -void -SentryIntegration::Initialize(std::string SentryDatabasePath, - std::string SentryAttachmentsPath, - bool AllowPII, - const std::string& CommandLine) -{ - m_AllowPII = AllowPII; - - if (SentryDatabasePath.starts_with("\\\\?\\")) - { - SentryDatabasePath = SentryDatabasePath.substr(4); - } - sentry_options_t* SentryOptions = sentry_options_new(); - sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); - sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); - sentry_options_set_logger(SentryOptions, SentryLogFunction, this); - if (SentryAttachmentsPath.starts_with("\\\\?\\")) - { - SentryAttachmentsPath = SentryAttachmentsPath.substr(4); - } - sentry_options_add_attachment(SentryOptions, SentryAttachmentsPath.c_str()); - sentry_options_set_release(SentryOptions, ZEN_CFG_VERSION); - - // sentry_options_set_debug(SentryOptions, 1); - - m_SentryErrorCode = sentry_init(SentryOptions); - - if (m_SentryErrorCode == 0) - { - sentry_value_t SentryUserObject = sentry_value_new_object(); - - if (m_AllowPII) - { -# if ZEN_PLATFORM_WINDOWS - CHAR Buffer[511 + 1]; - DWORD BufferLength = sizeof(Buffer) / sizeof(CHAR); - BOOL OK = GetUserNameA(Buffer, &BufferLength); - if (OK && BufferLength) - { - m_SentryUserName = std::string(Buffer, BufferLength - 1); - } - BufferLength = sizeof(Buffer) / sizeof(CHAR); - OK = GetComputerNameA(Buffer, &BufferLength); - if (OK && BufferLength) - { - m_SentryHostName = std::string(Buffer, BufferLength); - } - else - { - m_SentryHostName = "unknown"; - } -# endif // ZEN_PLATFORM_WINDOWS - -# if (ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC) - uid_t uid = geteuid(); - struct passwd* pw = getpwuid(uid); - if (pw) - { - m_SentryUserName = std::string(pw->pw_name); - } - else - { - m_SentryUserName = "unknown"; - } - char HostNameBuffer[1023 + 1]; - int err = gethostname(HostNameBuffer, sizeof(HostNameBuffer)); - if (err == 0) - { - m_SentryHostName = std::string(HostNameBuffer); - } - else - { - m_SentryHostName = "unknown"; - } -# endif - m_SentryId = fmt::format("{}@{}", m_SentryUserName, m_SentryHostName); - sentry_value_set_by_key(SentryUserObject, "id", sentry_value_new_string(m_SentryId.c_str())); - sentry_value_set_by_key(SentryUserObject, "username", sentry_value_new_string(m_SentryUserName.c_str())); - sentry_value_set_by_key(SentryUserObject, "ip_address", sentry_value_new_string("{{auto}}")); - } - - sentry_value_set_by_key(SentryUserObject, "cmd", sentry_value_new_string(CommandLine.c_str())); - - const std::string SessionId(GetSessionIdString()); - sentry_value_set_by_key(SentryUserObject, "session", sentry_value_new_string(SessionId.c_str())); - - sentry_set_user(SentryUserObject); - - m_SentryLogger = spdlog::create<sentry::sentry_sink>("sentry"); - logging::SetErrorLog("sentry"); - - m_SentryAssert = std::make_unique<sentry::SentryAssertImpl>(); - } - - m_IsInitialized = true; -} - -void -SentryIntegration::LogStartupInformation() -{ - if (m_IsInitialized) - { - if (m_SentryErrorCode == 0) - { - if (m_AllowPII) - { - ZEN_INFO("sentry initialized, username: '{}', hostname: '{}', id: '{}'", m_SentryUserName, m_SentryHostName, m_SentryId); - } - else - { - ZEN_INFO("sentry initialized with anonymous reports"); - } - } - else - { - ZEN_WARN( - "sentry_init returned failure! (error code: {}) note that sentry expects crashpad_handler to exist alongside the running " - "executable", - m_SentryErrorCode); - } - } -} - -void -SentryIntegration::ClearCaches() -{ - sentry_clear_modulecache(); -} - -} // namespace zen -#endif diff --git a/src/zenserver/sentryintegration.h b/src/zenserver/sentryintegration.h deleted file mode 100644 index 40e22af4e..000000000 --- a/src/zenserver/sentryintegration.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/intmath.h> -#include <zencore/zencore.h> - -#if !defined(ZEN_USE_SENTRY) -# define ZEN_USE_SENTRY 1 -#endif - -#if ZEN_USE_SENTRY - -# include <memory> - -ZEN_THIRD_PARTY_INCLUDES_START -# include <spdlog/logger.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace sentry { - -struct SentryAssertImpl; - -} // namespace sentry - -namespace zen { - -class SentryIntegration -{ -public: - SentryIntegration(); - ~SentryIntegration(); - - void Initialize(std::string SentryDatabasePath, std::string SentryAttachmentsPath, bool AllowPII, const std::string& CommandLine); - void LogStartupInformation(); - static void ClearCaches(); - -private: - int m_SentryErrorCode = 0; - bool m_IsInitialized = false; - bool m_AllowPII = false; - std::unique_ptr<sentry::SentryAssertImpl> m_SentryAssert; - std::string m_SentryUserName; - std::string m_SentryHostName; - std::string m_SentryId; - std::shared_ptr<spdlog::logger> m_SentryLogger; -}; - -} // namespace zen -#endif diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index e438a840a..744b861dd 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -134,7 +134,7 @@ namespace detail { return {.State = UpstreamEndpointState::kOk}; } - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); const JupiterResult Result = Session.Authenticate(); if (Result.Success) @@ -181,7 +181,7 @@ namespace detail { try { - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); JupiterResult Result; std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); @@ -301,7 +301,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords"); - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheKeyRequest* Request : Requests) @@ -365,7 +365,7 @@ namespace detail { try { - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); @@ -398,7 +398,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks"); - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -453,7 +453,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues"); - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); GetUpstreamCacheResult Result; for (CacheValueRequest* RequestPtr : CacheValueRequests) @@ -533,7 +533,7 @@ namespace detail { try { - JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterSession Session(m_Client->Logger(), m_Client->Client(), m_AllowRedirect); if (CacheRecord.Type == ZenContentType::kBinary) { @@ -756,6 +756,7 @@ namespace detail { UpstreamStatus m_Status; UpstreamEndpointStats m_Stats; RefPtr<JupiterClient> m_Client; + const bool m_AllowRedirect = false; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint diff --git a/src/zenserver/vfs/vfsservice.cpp b/src/zenserver/vfs/vfsservice.cpp index d302a10ec..bf761f8d1 100644 --- a/src/zenserver/vfs/vfsservice.cpp +++ b/src/zenserver/vfs/vfsservice.cpp @@ -61,7 +61,7 @@ GetContentAsCbObject(HttpServerRequest& HttpReq, CbObject& Cb) // echo {"method": "mount", "params": {"path": "d:\\VFS_ROOT"}} | curl.exe http://localhost:8558/vfs --data-binary @- // echo {"method": "unmount"} | curl.exe http://localhost:8558/vfs --data-binary @- -VfsService::VfsService() +VfsService::VfsService(HttpStatusService& StatusService) : m_StatusService(StatusService) { m_Impl = new Impl; @@ -136,10 +136,12 @@ VfsService::VfsService() } }, HttpVerb::kPost); + m_StatusService.RegisterHandler("vfs", *this); } VfsService::~VfsService() { + m_StatusService.UnregisterHandler("vfs", *this); delete m_Impl; } @@ -169,8 +171,9 @@ VfsService::AddService(Ref<ZenCacheStore>&& Z$) #else -VfsService::VfsService() +VfsService::VfsService(HttpStatusService& StatusService) : m_StatusService(StatusService) { + ZEN_UNUSED(StatusService); } VfsService::~VfsService() @@ -209,6 +212,14 @@ VfsService::BaseUri() const } void +VfsService::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void VfsService::HandleRequest(HttpServerRequest& HttpServiceRequest) { m_Router.HandleRequest(HttpServiceRequest); diff --git a/src/zenserver/vfs/vfsservice.h b/src/zenserver/vfs/vfsservice.h index dcdc71e81..0d0168e23 100644 --- a/src/zenserver/vfs/vfsservice.h +++ b/src/zenserver/vfs/vfsservice.h @@ -4,6 +4,7 @@ #include <zenbase/refcount.h> #include <zenhttp/httpserver.h> +#include <zenhttp/httpstatus.h> #include <zenvfs/vfs.h> #include <memory> @@ -24,10 +25,10 @@ class ZenCacheStore; */ -class VfsService : public HttpService +class VfsService : public HttpService, public IHttpStatusProvider { public: - VfsService(); + explicit VfsService(HttpStatusService& StatusService); ~VfsService(); void Mount(std::string_view MountPoint); @@ -39,12 +40,14 @@ public: protected: virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& HttpServiceRequest) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: struct Impl; Impl* m_Impl = nullptr; - HttpRequestRouter m_Router; + HttpStatusService& m_StatusService; + HttpRequestRouter m_Router; friend struct VfsServiceDataSource; }; diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 2d59c9357..7ef84743e 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -51,9 +51,9 @@ namespace { WriteWorkspaceConfig(Writer, WorkspaceConfig); if (std::optional<std::vector<Oid>> ShareIds = Workspaces.GetWorkspaceShares(WorkspaceConfig.Id); ShareIds) { - for (const Oid& ShareId : *ShareIds) + Writer.BeginArray("shares"); { - Writer.BeginArray("shares"); + for (const Oid& ShareId : *ShareIds) { if (std::optional<Workspaces::WorkspaceShareConfiguration> WorkspaceShareConfig = Workspaces.GetWorkspaceShareConfiguration(WorkspaceConfig.Id, ShareId); @@ -66,15 +66,19 @@ namespace { Writer.EndObject(); } } - Writer.EndArray(); } + Writer.EndArray(); } } } // namespace -HttpWorkspacesService::HttpWorkspacesService(HttpStatsService& StatsService, const WorkspacesServeConfig& Cfg, Workspaces& Workspaces) +HttpWorkspacesService::HttpWorkspacesService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + const WorkspacesServeConfig& Cfg, + Workspaces& Workspaces) : m_Log(logging::Get("workspaces")) +, m_StatusService(StatusService) , m_StatsService(StatsService) , m_Config(Cfg) , m_Workspaces(Workspaces) @@ -84,7 +88,8 @@ HttpWorkspacesService::HttpWorkspacesService(HttpStatsService& StatsService, con HttpWorkspacesService::~HttpWorkspacesService() { - m_StatsService.UnregisterHandler("prj", *this); + m_StatsService.UnregisterHandler("ws", *this); + m_StatusService.UnregisterHandler("ws", *this); } const char* @@ -149,14 +154,21 @@ HttpWorkspacesService::HandleStatsRequest(HttpServerRequest& HttpReq) } void +HttpWorkspacesService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpWorkspacesService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void HttpWorkspacesService::Initialize() { using namespace std::literals; ZEN_LOG_INFO(LogFs, "Initializing Workspaces Service"); - m_StatsService.RegisterHandler("ws", *this); - m_Router.AddPattern("workspace_id", "([[:xdigit:]]{24})"); m_Router.AddPattern("share_id", "([[:xdigit:]]{24})"); m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); @@ -238,6 +250,9 @@ HttpWorkspacesService::Initialize() HttpVerb::kGet); RefreshState(); + + m_StatsService.RegisterHandler("ws", *this); + m_StatusService.RegisterHandler("ws", *this); } std::filesystem::path @@ -589,7 +604,7 @@ void HttpWorkspacesService::ShareAliasFilesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -608,7 +623,7 @@ void HttpWorkspacesService::ShareAliasChunkInfoRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -635,7 +650,7 @@ void HttpWorkspacesService::ShareAliasBatchRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -654,7 +669,7 @@ void HttpWorkspacesService::ShareAliasEntriesRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -673,7 +688,7 @@ void HttpWorkspacesService::ShareAliasChunkRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -700,7 +715,7 @@ void HttpWorkspacesService::ShareAliasRequest(HttpRouterRequest& Req) { HttpServerRequest& ServerRequest = Req.ServerRequest(); - std::string Alias = Req.GetCapture(1); + std::string_view Alias = Req.GetCapture(1); if (Alias.empty()) { return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, @@ -1100,7 +1115,7 @@ HttpWorkspacesService::ShareRequest(HttpRouterRequest& Req, const Oid& Workspace } } - if (!std::filesystem::is_directory(Workspace.RootPath / NewConfig.SharePath)) + if (!IsDir(Workspace.RootPath / NewConfig.SharePath)) { return ServerRequest.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, diff --git a/src/zenserver/workspaces/httpworkspaces.h b/src/zenserver/workspaces/httpworkspaces.h index f01f58b86..89a8e8bdc 100644 --- a/src/zenserver/workspaces/httpworkspaces.h +++ b/src/zenserver/workspaces/httpworkspaces.h @@ -5,6 +5,7 @@ #include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> +#include <zenhttp/httpstatus.h> namespace zen { @@ -16,16 +17,20 @@ struct WorkspacesServeConfig bool AllowConfigurationChanges = false; }; -class HttpWorkspacesService final : public HttpService, public IHttpStatsProvider +class HttpWorkspacesService final : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider { public: - HttpWorkspacesService(HttpStatsService& StatsService, const WorkspacesServeConfig& Cfg, Workspaces& Workspaces); + HttpWorkspacesService(HttpStatusService& StatusService, + HttpStatsService& StatsService, + const WorkspacesServeConfig& Cfg, + Workspaces& Workspaces); virtual ~HttpWorkspacesService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: struct WorkspacesStats @@ -80,6 +85,7 @@ private: void ChunkRequest(HttpRouterRequest& Req, const Oid& WorkspaceId, const Oid& ShareId, const Oid& ChunkId); void ShareRequest(HttpRouterRequest& Req, const Oid& WorkspaceId, const Oid& InShareId); + HttpStatusService& m_StatusService; HttpStatsService& m_StatsService; const WorkspacesServeConfig m_Config; HttpRequestRouter m_Router; diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index a3d7aa124..470fbd24e 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -28,8 +28,6 @@ target("zenserver") add_cxxflags("/bigobj") add_links("delayimp", "projectedfslib") add_ldflags("/delayload:ProjectedFSLib.dll") - - add_links("dbghelp", "winhttp", "version") -- for Sentry else remove_files("windows/**") end @@ -41,7 +39,6 @@ target("zenserver") add_ldflags("-framework Foundation") add_ldflags("-framework Security") add_ldflags("-framework SystemConfiguration") - add_syslinks("bsm") end add_options("compute") @@ -57,18 +54,6 @@ target("zenserver") "vcpkg::sol2" ) - if has_config("zensentry") then - add_packages("vcpkg::sentry-native") - end - - if is_plat("linux") then - -- As sentry_native uses symbols from breakpad_client, the latter must - -- be specified after the former with GCC-like toolchains. xmake however - -- is unaware of this and simply globs files from vcpkg's output. The - -- line below forces breakpad_client to be to the right of sentry_native - add_syslinks("breakpad_client") - end - -- to work around some unfortunate Ctrl-C behaviour on Linux/Mac due to -- our use of setsid() at startup we pass in `--no-detach` to zenserver -- ensure that it recieves signals when the user requests termination diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index cc1be13e2..7e3baa997 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -2,8 +2,6 @@ #include "zenserver.h" -#include "sentryintegration.h" - #include <zenbase/refcount.h> #include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> @@ -16,6 +14,7 @@ #include <zencore/jobqueue.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/sentryintegration.h> #include <zencore/session.h> #include <zencore/string.h> #include <zencore/thread.h> @@ -23,6 +22,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> +#include <zenstore/buildstore/buildstore.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenstore/workspaces.h> @@ -137,7 +137,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen return -1; } - m_UseSentry = ServerOptions.NoSentry == false; + m_UseSentry = ServerOptions.SentryConfig.Disable == false; m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServerOptions.ShouldCrash; m_IsPowerCycle = ServerOptions.IsPowerCycle; @@ -250,18 +250,27 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen *m_JobQueue, *m_OpenProcessCache, ProjectStore::Configuration{}); - m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); + m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatusService, m_StatsService, *m_AuthMgr}); if (ServerOptions.WorksSpacesConfig.Enabled) { m_Workspaces.reset(new Workspaces()); m_HttpWorkspacesService.reset( - new HttpWorkspacesService(m_StatsService, + new HttpWorkspacesService(m_StatusService, + m_StatsService, {.SystemRootDir = ServerOptions.SystemRootDir, .AllowConfigurationChanges = ServerOptions.WorksSpacesConfig.AllowConfigurationChanges}, *m_Workspaces)); } + if (ServerOptions.BuildStoreConfig.Enabled) + { + BuildStoreConfig BuildsCfg; + BuildsCfg.RootDirectory = m_DataRoot / "builds"; + BuildsCfg.MaxDiskSpaceLimit = ServerOptions.BuildStoreConfig.MaxDiskSpaceLimit; + m_BuildStore = std::make_unique<BuildStore>(std::move(BuildsCfg), m_GcManager); + } + if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); @@ -287,7 +296,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_HttpWorkspacesService); } - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); if (m_FrontendService) { @@ -306,12 +315,18 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen ObjCfg.Buckets.push_back(std::move(NewBucket)); } - m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg)); + m_ObjStoreService = std::make_unique<HttpObjectStoreService>(m_StatusService, std::move(ObjCfg)); m_Http->RegisterService(*m_ObjStoreService); } + if (ServerOptions.BuildStoreConfig.Enabled) + { + m_BuildStoreService = std::make_unique<HttpBuildStoreService>(m_StatusService, m_StatsService, *m_BuildStore); + m_Http->RegisterService(*m_BuildStoreService); + } + #if ZEN_WITH_VFS - m_VfsService = std::make_unique<VfsService>(); + m_VfsService = std::make_unique<VfsService>(m_StatusService); m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); m_Http->RegisterService(*m_VfsService); @@ -327,6 +342,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen .Interval = std::chrono::seconds(ServerOptions.GcConfig.IntervalSeconds), .MaxCacheDuration = std::chrono::seconds(ServerOptions.GcConfig.Cache.MaxDurationSeconds), .MaxProjectStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds), + .MaxBuildStoreDuration = std::chrono::seconds(ServerOptions.GcConfig.BuildStore.MaxDurationSeconds), .CollectSmallObjects = ServerOptions.GcConfig.CollectSmallObjects, .Enabled = ServerOptions.GcConfig.Enabled, .DiskReserveSize = ServerOptions.GcConfig.DiskReserveSize, @@ -347,6 +363,7 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_CacheStore.Get(), m_CidStore.get(), m_ProjectStore, + m_BuildStore.get(), HttpAdminService::LogPaths{.AbsLogPath = ServerOptions.AbsLogFile, .HttpLogPath = ServerOptions.DataDir / "logs" / "http.log", .CacheLogPath = ServerOptions.DataDir / "logs" / "z$.log"}, @@ -418,7 +435,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) if (ManifestVersion != ZEN_CFG_SCHEMA_VERSION) { std::filesystem::path ManifestSkipSchemaChangePath = m_DataRoot / "root_manifest.ignore_schema_mismatch"; - if (ManifestVersion != 0 && std::filesystem::is_regular_file(ManifestSkipSchemaChangePath)) + if (ManifestVersion != 0 && IsFile(ManifestSkipSchemaChangePath)) { ZEN_INFO( "Schema version {} found in '{}' does not match {}, ignoring mismatch due to existance of '{}' and updating " @@ -467,7 +484,7 @@ ZenServer::InitializeState(const ZenServerOptions& ServerOptions) { ZEN_INFO("Deleting '{}'", DirEntry.path()); - std::filesystem::remove_all(DirEntry.path(), Ec); + DeleteDirectories(DirEntry.path(), Ec); if (Ec) { @@ -530,11 +547,28 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) Config.AllowAutomaticCreationOfNamespaces = true; Config.Logging = {.EnableWriteLog = ServerOptions.StructuredCacheConfig.WriteLogEnabled, .EnableAccessLog = ServerOptions.StructuredCacheConfig.AccessLogEnabled}; - Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold; - Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.LimitOverwrites; - Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; - Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; - Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; + + for (const auto& It : ServerOptions.StructuredCacheConfig.PerBucketConfigs) + { + const std::string& BucketName = It.first; + const ZenStructuredCacheBucketConfig& ZenBucketConfig = It.second; + ZenCacheDiskLayer::BucketConfiguration BucketConfig = {.MaxBlockSize = ZenBucketConfig.MaxBlockSize, + .PayloadAlignment = ZenBucketConfig.PayloadAlignment, + .MemCacheSizeThreshold = ZenBucketConfig.MemCacheSizeThreshold, + .LargeObjectThreshold = ZenBucketConfig.LargeObjectThreshold}; + Config.NamespaceConfig.DiskLayerConfig.BucketConfigMap.insert_or_assign(BucketName, BucketConfig); + } + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MaxBlockSize = ServerOptions.StructuredCacheConfig.BucketConfig.MaxBlockSize, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.PayloadAlignment = + ServerOptions.StructuredCacheConfig.BucketConfig.PayloadAlignment, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.MemCacheSizeThreshold = + ServerOptions.StructuredCacheConfig.BucketConfig.MemCacheSizeThreshold, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LargeObjectThreshold = + ServerOptions.StructuredCacheConfig.BucketConfig.LargeObjectThreshold, + Config.NamespaceConfig.DiskLayerConfig.BucketConfig.LimitOverwrites = ServerOptions.StructuredCacheConfig.LimitOverwrites; + Config.NamespaceConfig.DiskLayerConfig.MemCacheTargetFootprintBytes = ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes; + Config.NamespaceConfig.DiskLayerConfig.MemCacheTrimIntervalSeconds = ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds; + Config.NamespaceConfig.DiskLayerConfig.MemCacheMaxAgeSeconds = ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds; if (ServerOptions.IsDedicated) { @@ -802,6 +836,9 @@ ZenServer::Cleanup() m_ObjStoreService.reset(); m_FrontendService.reset(); + m_BuildStoreService.reset(); + m_BuildStore = {}; + m_StructuredCacheService.reset(); m_UpstreamService.reset(); m_UpstreamCache.reset(); @@ -896,7 +933,7 @@ ZenServer::CheckStateMarker() std::filesystem::path StateMarkerPath = m_DataRoot / "state_marker"; try { - if (!std::filesystem::exists(StateMarkerPath)) + if (!IsFile(StateMarkerPath)) { ZEN_WARN("state marker at {} has been deleted, exiting", StateMarkerPath); RequestExit(1); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 80054dc35..5cfa04ba1 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -25,6 +25,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include <zenstore/cache/structuredcachestore.h> #include <zenstore/gc.h> #include "admin/admin.h" +#include "buildstore/httpbuildstore.h" #include "cache/httpstructuredcache.h" #include "diag/diagsvcs.h" #include "frontend/frontend.h" @@ -127,6 +128,8 @@ private: Ref<ZenCacheStore> m_CacheStore; std::unique_ptr<OpenProcessCache> m_OpenProcessCache; HttpTestService m_TestService; + std::unique_ptr<BuildStore> m_BuildStore; + #if ZEN_WITH_TESTS HttpTestingService m_TestingService; #endif @@ -140,6 +143,7 @@ private: HttpHealthService m_HealthService; std::unique_ptr<HttpFrontendService> m_FrontendService; std::unique_ptr<HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<HttpBuildStoreService> m_BuildStoreService; std::unique_ptr<VfsService> m_VfsService; std::unique_ptr<JobQueue> m_JobQueue; std::unique_ptr<HttpAdminService> m_AdminService; |