diff options
| author | Liam Mitchell <[email protected]> | 2025-07-29 23:04:15 +0000 |
|---|---|---|
| committer | Liam Mitchell <[email protected]> | 2025-07-29 23:04:15 +0000 |
| commit | bf0039cbab6dc21ce09c15be60878ee4208d8723 (patch) | |
| tree | 553353471925c72459b91563ccceb17accd51ec3 /src/zenserver/projectstore | |
| parent | Always upload vcpkg logs on failure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bf0039cbab6dc21ce09c15be60878ee4208d8723.tar.xz zen-bf0039cbab6dc21ce09c15be60878ee4208d8723.zip | |
Merge branch 'main' into de/zen-service-command
Diffstat (limited to 'src/zenserver/projectstore')
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.cpp | 41 | ||||
| -rw-r--r-- | src/zenserver/projectstore/buildsremoteprojectstore.h | 23 | ||||
| -rw-r--r-- | src/zenserver/projectstore/fileremoteprojectstore.cpp | 10 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.cpp | 80 | ||||
| -rw-r--r-- | src/zenserver/projectstore/httpprojectstore.h | 11 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.cpp | 37 | ||||
| -rw-r--r-- | src/zenserver/projectstore/jupiterremoteprojectstore.h | 23 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 448 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 1 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 6 |
10 files changed, 376 insertions, 304 deletions
diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index fbb9bc344..ab96ae92d 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -35,16 +35,10 @@ public: , m_BuildId(BuildId) , m_MetaData(MetaData) , m_TempFilePath(TempFilePath) + , m_EnableBlocks(!ForceDisableBlocks) + , m_UseTempBlocks(!ForceDisableTempBlocks) { m_MetaData.MakeOwned(); - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceDisableTempBlocks) - { - m_UseTempBlocks = false; - } } virtual RemoteStoreInfo GetInfo() const override @@ -71,7 +65,7 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); IoBuffer Payload = m_MetaData; Payload.SetContentType(ZenContentType::kCbObject); @@ -95,7 +89,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); PutBuildPartResult PutResult = Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); AddStats(PutResult); @@ -120,7 +114,7 @@ public: ChunkBlockDescription&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult PutResult = Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); @@ -183,7 +177,7 @@ public: ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); FinalizeBuildPartResult FinalizeRefResult = Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); AddStats(FinalizeRefResult); @@ -222,7 +216,7 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(GetBuildResult); LoadContainerResult Result{ConvertResult(GetBuildResult)}; @@ -307,8 +301,8 @@ public: virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, (uint64_t)-1); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) @@ -356,7 +350,7 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, RawHash, m_TempFilePath); AddStats(GetResult); @@ -452,8 +446,9 @@ private: IoBuffer m_MetaData; Oid m_OplogBuildPartId = Oid::Zero; std::filesystem::path m_TempFilePath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = true; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; @@ -494,7 +489,15 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } - else + else if (!Options.OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.h b/src/zenserver/projectstore/buildsremoteprojectstore.h index 8b2c6c8c8..c52b13886 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.h +++ b/src/zenserver/projectstore/buildsremoteprojectstore.h @@ -10,17 +10,18 @@ class AuthMgr; struct BuildsRemoteStoreOptions : RemoteStoreOptions { - std::string Url; - std::string Namespace; - std::string Bucket; - Oid BuildId; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; - IoBuffer MetaData; + std::string Url; + std::string Namespace; + std::string Bucket; + Oid BuildId; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + std::filesystem::path OidcExePath; + bool ForceDisableBlocks = false; + bool ForceDisableTempBlocks = false; + bool AssumeHttp2 = false; + IoBuffer MetaData; }; std::shared_ptr<RemoteProjectStore> CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, diff --git a/src/zenserver/projectstore/fileremoteprojectstore.cpp b/src/zenserver/projectstore/fileremoteprojectstore.cpp index 98e292d91..375e44e59 100644 --- a/src/zenserver/projectstore/fileremoteprojectstore.cpp +++ b/src/zenserver/projectstore/fileremoteprojectstore.cpp @@ -73,7 +73,7 @@ public: ContainerObject.IterateAttachments([&](CbFieldView FieldView) { IoHash AttachmentHash = FieldView.AsBinaryAttachment(); std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); - if (!std::filesystem::exists(AttachmentPath)) + if (!IsFile(AttachmentPath)) { Result.Needs.insert(AttachmentHash); } @@ -111,7 +111,7 @@ public: Stopwatch Timer; SaveAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::exists(ChunkPath)) + if (!IsFile(ChunkPath)) { try { @@ -182,7 +182,7 @@ public: for (const IoHash& RawHash : BlockHashes) { std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (std::filesystem::is_regular_file(ChunkPath)) + if (IsFile(ChunkPath)) { ExistingBlockHashes.push_back(RawHash); } @@ -203,7 +203,7 @@ public: Stopwatch Timer; LoadAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!std::filesystem::is_regular_file(ChunkPath)) + if (!IsFile(ChunkPath)) { Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); @@ -246,7 +246,7 @@ private: LoadContainerResult Result; std::filesystem::path SourcePath = m_OutputPath; SourcePath.append(Name); - if (!std::filesystem::is_regular_file(SourcePath)) + if (!IsFile(SourcePath)) { Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index 47748dd90..317a419eb 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> #include <zencore/compactbinaryutil.h> +#include <zencore/compactbinaryvalidation.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -234,10 +235,15 @@ namespace { ////////////////////////////////////////////////////////////////////////// -HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, HttpStatsService& StatsService, AuthMgr& AuthMgr) +HttpProjectService::HttpProjectService(CidStore& Store, + ProjectStore* Projects, + HttpStatusService& StatusService, + HttpStatsService& StatsService, + AuthMgr& AuthMgr) : m_Log(logging::Get("project")) , m_CidStore(Store) , m_ProjectStore(Projects) +, m_StatusService(StatusService) , m_StatsService(StatsService) , m_AuthMgr(AuthMgr) { @@ -245,8 +251,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, using namespace std::literals; - m_StatsService.RegisterHandler("prj", *this); - m_Router.AddPattern("project", "([[:alnum:]_.]+)"); m_Router.AddPattern("log", "([[:alnum:]_.]+)"); m_Router.AddPattern("op", "([[:digit:]]+?)"); @@ -365,11 +369,15 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, "details\\$/{project}/{log}/{chunk}", [this](HttpRouterRequest& Req) { HandleOplogOpDetailsRequest(Req); }, HttpVerb::kGet); + + m_StatusService.RegisterHandler("prj", *this); + m_StatsService.RegisterHandler("prj", *this); } HttpProjectService::~HttpProjectService() { m_StatsService.UnregisterHandler("prj", *this); + m_StatusService.UnregisterHandler("prj", *this); } const char* @@ -465,6 +473,15 @@ HttpProjectService::HandleStatsRequest(HttpServerRequest& HttpReq) } void +HttpProjectService::HandleStatusRequest(HttpServerRequest& Request) +{ + ZEN_TRACE_CPU("HttpProjectService::Status"); + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void HttpProjectService::HandleProjectListRequest(HttpRouterRequest& Req) { ZEN_TRACE_CPU("ProjectService::ProjectList"); @@ -885,10 +902,63 @@ HttpProjectService::HandleChunkByCidRequest(HttpRouterRequest& Req) case HttpVerb::kGet: { IoBuffer Value; - std::pair<HttpResponseCode, std::string> Result = - m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value, nullptr); + std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, Value, nullptr); if (Result.first == HttpResponseCode::OK) { + if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary || + AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Value)); + IoBuffer DecompressedBuffer = Compressed.Decompress().AsIoBuffer(); + + if (DecompressedBuffer) + { + if (AcceptType == ZenContentType::kJSON || AcceptType == ZenContentType::kYAML || + AcceptType == ZenContentType::kCbObject) + { + CbValidateError CbErr = ValidateCompactBinary(DecompressedBuffer.GetView(), CbValidateMode::Default); + if (!!CbErr) + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG( + "chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not convert to object`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not convert to object", + ToString(AcceptType))); + } + + m_ProjectStats.ChunkHitCount++; + CbObject ContainerObject = LoadCompactBinaryObject(DecompressedBuffer); + return HttpReq.WriteResponse(HttpResponseCode::OK, ContainerObject); + } + else + { + Value = DecompressedBuffer; + Value.SetContentType(ZenContentType::kBinary); + } + } + else + { + m_ProjectStats.BadRequestCount++; + ZEN_DEBUG("chunk - '{}/{}/{}' WRONGTYPE. Reason: `Requested {} format, but could not decompress stored data`", + ProjectId, + OplogId, + Cid, + ToString(AcceptType)); + return HttpReq.WriteResponse( + HttpResponseCode::NotAcceptable, + HttpContentType::kText, + fmt::format("Content format not supported, requested {} format, but could not decompress stored data", + ToString(AcceptType))); + } + } m_ProjectStats.ChunkHitCount++; return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value); } diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 8e74c57a5..295defa5c 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -5,6 +5,7 @@ #include <zencore/stats.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> +#include <zenhttp/httpstatus.h> #include <zenstore/cidstore.h> namespace zen { @@ -31,16 +32,21 @@ class ProjectStore; // refs: // -class HttpProjectService : public HttpService, public IHttpStatsProvider +class HttpProjectService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider { public: - HttpProjectService(CidStore& Store, ProjectStore* InProjectStore, HttpStatsService& StatsService, AuthMgr& AuthMgr); + HttpProjectService(CidStore& Store, + ProjectStore* InProjectStore, + HttpStatusService& StatusService, + HttpStatsService& StatsService, + AuthMgr& AuthMgr); ~HttpProjectService(); virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; private: struct ProjectStats @@ -89,6 +95,7 @@ private: CidStore& m_CidStore; HttpRequestRouter m_Router; Ref<ProjectStore> m_ProjectStore; + HttpStatusService& m_StatusService; HttpStatsService& m_StatsService; AuthMgr& m_AuthMgr; ProjectStats m_ProjectStats; diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index e5839ad3b..3728babb5 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -31,15 +31,9 @@ public: , m_Key(Key) , m_OptionalBaseKey(OptionalBaseKey) , m_TempFilePath(TempFilePath) + , m_EnableBlocks(!ForceDisableBlocks) + , m_UseTempBlocks(!ForceDisableTempBlocks) { - if (ForceDisableBlocks) - { - m_EnableBlocks = false; - } - if (ForceDisableTempBlocks) - { - m_UseTempBlocks = false; - } } virtual RemoteStoreInfo GetInfo() const override @@ -75,7 +69,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); AddStats(PutResult); @@ -94,7 +88,7 @@ public: virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); AddStats(PutResult); @@ -127,7 +121,7 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); AddStats(FinalizeRefResult); @@ -164,7 +158,7 @@ public: {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; } - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterExistsResult ExistsResult = Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); AddStats(ExistsResult); @@ -203,7 +197,7 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); AddStats(GetResult); @@ -239,7 +233,7 @@ public: private: LoadContainerResult LoadContainer(const IoHash& Key) { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); AddStats(GetResult); if (GetResult.ErrorCode || !GetResult.Success) @@ -329,8 +323,9 @@ private: const IoHash m_Key; const IoHash m_OptionalBaseKey; std::filesystem::path m_TempFilePath; - bool m_EnableBlocks = true; - bool m_UseTempBlocks = true; + const bool m_EnableBlocks = true; + const bool m_UseTempBlocks = true; + const bool m_AllowRedirect = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; @@ -371,7 +366,15 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi { TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } - else + else if (!Options.OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, Url); TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) { TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.h b/src/zenserver/projectstore/jupiterremoteprojectstore.h index 27f3d9b73..8bf79d563 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.h +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.h @@ -10,17 +10,18 @@ class AuthMgr; struct JupiterRemoteStoreOptions : RemoteStoreOptions { - std::string Url; - std::string Namespace; - std::string Bucket; - IoHash Key; - IoHash OptionalBaseKey; - std::string OpenIdProvider; - std::string AccessToken; - AuthMgr& AuthManager; - bool ForceDisableBlocks = false; - bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; + std::string Url; + std::string Namespace; + std::string Bucket; + IoHash Key; + IoHash OptionalBaseKey; + std::string OpenIdProvider; + std::string AccessToken; + AuthMgr& AuthManager; + std::filesystem::path OidcExePath; + bool ForceDisableBlocks = false; + bool ForceDisableTempBlocks = false; + bool AssumeHttp2 = false; }; std::shared_ptr<RemoteProjectStore> CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 86791e29a..53e687983 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -22,6 +22,7 @@ #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/openprocesscache.h> +#include <zenutil/parallelwork.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -58,7 +59,7 @@ namespace { std::filesystem::path DroppedBucketPath; do { - if (!std::filesystem::exists(Dir)) + if (!IsDir(Dir)) { return true; } @@ -68,7 +69,7 @@ namespace { std::string DroppedName = fmt::format("[dropped]{}({})", Dir.filename().string(), MovedId); DroppedBucketPath = Dir.parent_path() / DroppedName; - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { if (!DeleteDirectories(DroppedBucketPath)) { @@ -77,7 +78,7 @@ namespace { Dir); continue; } - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { ZEN_INFO("Drop directory '{}' for '{}' still exists after remove, attempting different name.", DroppedBucketPath, Dir); continue; @@ -88,13 +89,13 @@ namespace { do { std::error_code Ec; - std::filesystem::rename(Dir, DroppedBucketPath, Ec); + RenameDirectory(Dir, DroppedBucketPath, Ec); if (!Ec) { OutDeleteDir = DroppedBucketPath; return true; } - if (std::filesystem::exists(DroppedBucketPath)) + if (IsDir(DroppedBucketPath)) { ZEN_INFO("Can't rename '{}' to still existing drop directory '{}'. Reason: '{}'. Attempting different name.", Dir, @@ -210,6 +211,16 @@ namespace { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Cloud["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (!IsFile(OidcExePathMaybe)) + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + OidcExePath = std::move(OidcExePathMaybe); + } + } std::string_view KeyParam = Cloud["key"sv].AsString(); if (KeyParam.empty()) { @@ -252,6 +263,7 @@ namespace { std::string(OpenIdProvider), AccessToken, AuthManager, + OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2}; @@ -307,6 +319,16 @@ namespace { AccessToken = GetEnvVariable(AccessTokenEnvVariable); } } + std::filesystem::path OidcExePath; + if (std::string_view OidcExePathString = Builds["oidc-exe-path"].AsString(); !OidcExePathString.empty()) + { + std::filesystem::path OidcExePathMaybe(OidcExePathString); + if (!IsFile(OidcExePathMaybe)) + { + ZEN_WARN("Path to OidcToken executable '{}' can not be reached by server", OidcExePathString); + OidcExePath = std::move(OidcExePathMaybe); + } + } std::string_view BuildIdParam = Builds["buildsid"sv].AsString(); if (BuildIdParam.empty()) { @@ -337,6 +359,7 @@ namespace { std::string(OpenIdProvider), AccessToken, AuthManager, + OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, AssumeHttp2, @@ -486,7 +509,7 @@ struct ProjectStore::OplogStorage : public RefCounted [[nodiscard]] bool Exists() const { return Exists(m_OplogStoragePath); } [[nodiscard]] static bool Exists(const std::filesystem::path& BasePath) { - return std::filesystem::exists(GetLogPath(BasePath)) && std::filesystem::exists(GetBlobsPath(BasePath)); + return IsFile(GetLogPath(BasePath)) && IsFile(GetBlobsPath(BasePath)); } [[nodiscard]] bool IsValid() const { return IsValid(m_OplogStoragePath); } [[nodiscard]] static bool IsValid(const std::filesystem::path& BasePath) @@ -496,13 +519,13 @@ struct ProjectStore::OplogStorage : public RefCounted void WipeState() const { std::error_code Ec; - std::filesystem::remove(GetLogPath(), Ec); - std::filesystem::remove(GetBlobsPath(), Ec); + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); } static bool Delete(const std::filesystem::path& BasePath) { return DeleteDirectories(BasePath); } - uint64_t OpBlobsSize() const { return std::filesystem::file_size(GetBlobsPath()); } + uint64_t OpBlobsSize() const { return FileSizeFromPath(GetBlobsPath()); } uint64_t OpsSize() const { return OpsSize(m_OplogStoragePath); } static uint64_t OpsSize(const std::filesystem::path& BasePath) @@ -510,7 +533,7 @@ struct ProjectStore::OplogStorage : public RefCounted if (Exists(BasePath)) { std::error_code DummyEc; - return std::filesystem::file_size(GetLogPath(BasePath)) + std::filesystem::file_size(GetBlobsPath(BasePath)); + return FileSizeFromPath(GetLogPath(BasePath)) + FileSizeFromPath(GetBlobsPath(BasePath)); } return 0; } @@ -689,7 +712,7 @@ struct ProjectStore::OplogStorage : public RefCounted m_OpBlobs.Close(); Oplog.Close(); - std::filesystem::rename(OplogPath, GetLogPath(), Ec); + RenameFile(OplogPath, GetLogPath(), Ec); if (Ec) { throw std::system_error( @@ -702,9 +725,9 @@ struct ProjectStore::OplogStorage : public RefCounted if (Ec) { // We failed late - clean everything up as best we can - std::filesystem::remove(OpBlobs.GetPath(), Ec); - std::filesystem::remove(GetLogPath(), Ec); - std::filesystem::remove(GetBlobsPath(), Ec); + RemoveFile(OpBlobs.GetPath(), Ec); + RemoveFile(GetLogPath(), Ec); + RemoveFile(GetBlobsPath(), Ec); throw std::system_error(Ec, fmt::format("Oplog::Compact failed to rename temporary oplog file from '{}' to '{}'", OpBlobs.GetPath(), @@ -739,7 +762,7 @@ struct ProjectStore::OplogStorage : public RefCounted } catch (const std::exception& /*Ex*/) { - std::filesystem::remove(OpBlobs.GetPath(), Ec); + RemoveFile(OpBlobs.GetPath(), Ec); throw; } } @@ -987,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted .OpCoreHash = OpData.OpCoreHash, .OpKeyHash = OpData.KeyHash}; - m_Oplog.Append(Entry); m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); + m_Oplog.Append(Entry); return Entry; } @@ -1108,7 +1131,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, ZEN_WARN("Invalid oplog found at '{}'. Wiping state for oplog.", m_BasePath); m_Storage->WipeState(); std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } } m_Storage->Open(/* IsCreate */ !StoreExists); @@ -1116,7 +1139,7 @@ ProjectStore::Oplog::Oplog(std::string_view Id, m_MetaPath = m_BasePath / "ops.meta"sv; m_MetaValid = !IsFileOlderThan(m_MetaPath, m_Storage->GetBlobsPath()); - CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); } ProjectStore::Oplog::~Oplog() @@ -1142,7 +1165,7 @@ ProjectStore::Oplog::Flush() if (!m_MetaValid) { std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } uint64_t LogCount = m_Storage->LogCount(); @@ -1238,19 +1261,19 @@ ProjectStore::Oplog::TotalSize(const std::filesystem::path& BasePath) uint64_t Size = OplogStorage::OpsSize(BasePath); std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (std::filesystem::exists(StateFilePath)) + if (IsFile(StateFilePath)) { - Size += std::filesystem::file_size(StateFilePath); + Size += FileSizeFromPath(StateFilePath); } std::filesystem::path MetaFilePath = BasePath / "ops.meta"sv; - if (std::filesystem::exists(MetaFilePath)) + if (IsFile(MetaFilePath)) { - Size += std::filesystem::file_size(MetaFilePath); + Size += FileSizeFromPath(MetaFilePath); } std::filesystem::path IndexFilePath = BasePath / "ops.zidx"sv; - if (std::filesystem::exists(IndexFilePath)) + if (IsFile(IndexFilePath)) { - Size += std::filesystem::file_size(IndexFilePath); + Size += FileSizeFromPath(IndexFilePath); } return Size; @@ -1303,7 +1326,7 @@ ProjectStore::Oplog::ExistsAt(const std::filesystem::path& BasePath) using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - return std::filesystem::is_regular_file(StateFilePath); + return IsFile(StateFilePath); } bool @@ -1337,7 +1360,7 @@ ProjectStore::Oplog::Read() if (!m_MetaValid) { std::error_code DummyEc; - std::filesystem::remove(m_MetaPath, DummyEc); + RemoveFile(m_MetaPath, DummyEc); } ReadIndexSnapshot(); @@ -1438,7 +1461,7 @@ ProjectStore::Oplog::Reset() m_Storage = new OplogStorage(this, m_BasePath); m_Storage->Open(true); m_MetaValid = false; - CleanDirectory(m_TempPath); + CleanDirectory(m_TempPath, /*ForceRemoveReadOnlyFiles*/ false); Write(); } // Erase content on disk @@ -1457,7 +1480,7 @@ ProjectStore::Oplog::ReadStateFile(const std::filesystem::path& BasePath, std::f using namespace std::literals; std::filesystem::path StateFilePath = BasePath / "oplog.zcb"sv; - if (std::filesystem::is_regular_file(StateFilePath)) + if (IsFile(StateFilePath)) { // ZEN_INFO("oplog '{}/{}': config read from '{}'", m_OuterProject->Identifier, m_OplogId, StateFilePath); @@ -1488,11 +1511,17 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo ValidationResult Result; + const size_t OpCount = OplogCount(); + std::vector<Oid> KeyHashes; std::vector<std::string> Keys; std::vector<std::vector<IoHash>> Attachments; std::vector<OplogEntryMapping> Mappings; + KeyHashes.reserve(OpCount); + Keys.reserve(OpCount); + Mappings.reserve(OpCount); + IterateOplogWithKey([&](uint32_t LSN, const Oid& Key, CbObjectView OpView) { Result.LSNLow = Min(Result.LSNLow, LSN); Result.LSNHigh = Max(Result.LSNHigh, LSN); @@ -1517,77 +1546,90 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo bool HasMissingEntries = false; for (const ChunkMapping& Chunk : Mapping.Chunks) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Chunk.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Chunk.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingChunks.push_back({KeyHash, Chunk}); }); HasMissingEntries = true; } } + for (const ChunkMapping& Meta : Mapping.Meta) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Meta.Hash); !Payload) + if (!m_CidStore.ContainsChunk(Meta.Hash)) { ResultLock.WithExclusiveLock([&]() { Result.MissingMetas.push_back({KeyHash, Meta}); }); HasMissingEntries = true; } } + for (const FileMapping& File : Mapping.Files) { if (File.Hash == IoHash::Zero) { std::filesystem::path FilePath = m_OuterProject->RootDir / File.ServerPath; - if (!std::filesystem::is_regular_file(FilePath)) + if (!IsFile(FilePath)) { ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); HasMissingEntries = true; } } - else + else if (!m_CidStore.ContainsChunk(File.Hash)) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(File.Hash); !Payload) - { - ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); - HasMissingEntries = true; - } + ResultLock.WithExclusiveLock([&]() { Result.MissingFiles.push_back({KeyHash, File}); }); + HasMissingEntries = true; } } + const std::vector<IoHash>& OpAttachments = Attachments[OpIndex]; for (const IoHash& Attachment : OpAttachments) { - if (IoBuffer Payload = m_CidStore.FindChunkByCid(Attachment); !Payload) + if (!m_CidStore.ContainsChunk(Attachment)) { ResultLock.WithExclusiveLock([&]() { Result.MissingAttachments.push_back({KeyHash, Attachment}); }); HasMissingEntries = true; } } + if (HasMissingEntries) { ResultLock.WithExclusiveLock([&]() { Result.OpKeys.push_back({KeyHash, Key}); }); } }; - Latch WorkLatch(1); - - for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - if (OptionalWorkerPool) - { - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork([&, Index = OpIndex]() { - ZEN_MEMSCOPE(GetProjectstoreTag()); - - auto _ = MakeGuard([&WorkLatch] { WorkLatch.CountDown(); }); - ValidateOne(Index); - }); - } - else + for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { - ValidateOne(OpIndex); + if (AbortFlag) + { + break; + } + if (OptionalWorkerPool) + { + Work.ScheduleWork(*OptionalWorkerPool, [&ValidateOne, Index = OpIndex](std::atomic<bool>& AbortFlag) { + ZEN_MEMSCOPE(GetProjectstoreTag()); + if (AbortFlag) + { + return; + } + ValidateOne(Index); + }); + } + else + { + ValidateOne(OpIndex); + } } } - - WorkLatch.CountDown(); - WorkLatch.Wait(); + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed validating oplogs in {}. Reason: '{}'", m_BasePath, Ex.what()); + } + Work.Wait(); { // Check if we were deleted while we were checking the references without a lock... @@ -1621,31 +1663,9 @@ ProjectStore::Oplog::WriteIndexSnapshot() namespace fs = std::filesystem; - fs::path IndexPath = m_BasePath / "ops.zidx"; - fs::path TempIndexPath = m_BasePath / "ops.zidx.tmp"; - - // Move index away, we keep it if something goes wrong - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to clean up temp snapshot at {}, reason: '{}'", - GetOuterProject()->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - return; - } - } - + const fs::path IndexPath = m_BasePath / "ops.zidx"; try { - if (fs::is_regular_file(IndexPath)) - { - fs::rename(IndexPath, TempIndexPath); - } - // Write the current state of the location map to a new index state std::vector<uint32_t> LSNEntries; std::vector<Oid> Keys; @@ -1726,36 +1746,28 @@ ProjectStore::Oplog::WriteIndexSnapshot() uint64_t Offset = 0; IndexFile.Write(&Header, sizeof(OplogIndexHeader), Offset); - Offset += sizeof(OplogIndexHeader); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(LSNEntries.data(), LSNEntries.size() * sizeof(uint32_t), Offset); - Offset += LSNEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(Keys.data(), Keys.size() * sizeof(Oid), Offset); - Offset += Keys.size() * sizeof(Oid); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(AddressMapEntries.data(), AddressMapEntries.size() * sizeof(OplogEntryAddress), Offset); - Offset += AddressMapEntries.size() * sizeof(OplogEntryAddress); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(LatestOpMapEntries.data(), LatestOpMapEntries.size() * sizeof(uint32_t), Offset); - Offset += LatestOpMapEntries.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(ChunkMapEntries.data(), ChunkMapEntries.size() * sizeof(IoHash), Offset); - Offset += ChunkMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(MetaMapEntries.data(), MetaMapEntries.size() * sizeof(IoHash), Offset); - Offset += MetaMapEntries.size() * sizeof(IoHash); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); IndexFile.Write(FilePathLengths.data(), FilePathLengths.size() * sizeof(uint32_t), Offset); - Offset += FilePathLengths.size() * sizeof(uint32_t); - Offset = RoundUp(Offset, OplogIndexHeader::DataAlignment); + Offset = IndexFile.AlignTo(OplogIndexHeader::DataAlignment); for (const auto& FilePath : FilePaths) { @@ -1767,7 +1779,11 @@ ProjectStore::Oplog::WriteIndexSnapshot() ObjectIndexFile.MoveTemporaryIntoPlace(IndexPath, Ec); if (Ec) { - throw std::system_error(Ec, fmt::format("Failed to move temp file '{}' to '{}'", ObjectIndexFile.GetPath(), IndexPath)); + throw std::system_error(Ec, + fmt::format("Snapshot failed to rename new snapshot '{}' to '{}', reason: '{}'", + ObjectIndexFile.GetPath(), + IndexPath, + Ec.message())); } EntryCount = LSNEntries.size(); m_LogFlushPosition = IndexLogPosition; @@ -1775,35 +1791,6 @@ ProjectStore::Oplog::WriteIndexSnapshot() catch (const std::exception& Err) { ZEN_WARN("oplog '{}/{}': snapshot FAILED, reason: '{}'", m_OuterProject->Identifier, m_OplogId, Err.what()); - - // Restore any previous snapshot - - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - fs::remove(IndexPath, Ec); // We don't care if this fails, we try to move the old temp file regardless - fs::rename(TempIndexPath, IndexPath, Ec); - if (Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to restore old snapshot from {}, reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - } - } - } - if (fs::is_regular_file(TempIndexPath)) - { - std::error_code Ec; - if (!fs::remove(TempIndexPath, Ec) || Ec) - { - ZEN_WARN("oplog '{}/{}': snapshot failed to remove temporary file {}, reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - TempIndexPath, - Ec.message()); - } } } @@ -1813,8 +1800,8 @@ ProjectStore::Oplog::ReadIndexSnapshot() ZEN_MEMSCOPE(GetProjectstoreTag()); ZEN_TRACE_CPU("Oplog::ReadIndexSnapshot"); - std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; - if (std::filesystem::is_regular_file(IndexPath)) + const std::filesystem::path IndexPath = m_BasePath / "ops.zidx"; + if (IsFile(IndexPath)) { uint64_t EntryCount = 0; Stopwatch Timer; @@ -2139,66 +2126,81 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, } if (OptionalWorkerPool) { - std::atomic_bool Result = true; - Latch WorkLatch(1); - - for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - if (Result.load() == false) + for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { - break; - } - WorkLatch.AddCount(1); - OptionalWorkerPool->ScheduleWork( - [this, &WorkLatch, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback, &Result]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - if (Result.load() == false) - { - return; - } - size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; - const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; - try - { - IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); - if (!Payload) + if (AbortFlag) + { + break; + } + Work.ScheduleWork( + *OptionalWorkerPool, + [this, &ChunkIds, IncludeModTag, ChunkIndex, &FileChunkIndexes, &FileChunkPaths, &AsyncCallback]( + std::atomic<bool>& AbortFlag) { + if (AbortFlag) { - ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[ChunkIndex], FilePath); + return; } + size_t FileChunkIndex = FileChunkIndexes[ChunkIndex]; + const std::filesystem::path& FilePath = FileChunkPaths[ChunkIndex]; + try + { + IoBuffer Payload = IoBufferBuilder::MakeFromFile(FilePath); + if (!Payload) + { + ZEN_WARN("Trying to fetch chunk {} using file path {} failed", ChunkIds[FileChunkIndex], FilePath); + } - if (!AsyncCallback(FileChunkIndex, Payload, IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + if (!AsyncCallback(FileChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromModificationTime(Payload) : 0)) + { + AbortFlag.store(true); + } + } + catch (const std::exception& Ex) { - Result.store(false); + ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", + m_OuterProject->Identifier, + m_OplogId, + FileChunkIndex, + FilePath, + Ex.what()); } - } - catch (const std::exception& Ex) - { - ZEN_WARN("oplog '{}/{}': exception caught when iterating file chunk {}, path '{}'. Reason: '{}'", - m_OuterProject->Identifier, - m_OplogId, - FileChunkIndex, - FilePath, - Ex.what()); - } - }); - } + }); + } - if (!CidChunkHashes.empty()) + if (!CidChunkHashes.empty() && !AbortFlag) + { + m_CidStore.IterateChunks( + CidChunkHashes, + [&](size_t Index, const IoBuffer& Payload) { + size_t CidChunkIndex = CidChunkIndexes[Index]; + if (AbortFlag) + { + return false; + } + return AsyncCallback(CidChunkIndex, + Payload, + IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); + }, + OptionalWorkerPool, + LargeSizeLimit); + } + } + catch (const std::exception& Ex) { - m_CidStore.IterateChunks( - CidChunkHashes, - [&](size_t Index, const IoBuffer& Payload) { - size_t CidChunkIndex = CidChunkIndexes[Index]; - return AsyncCallback(CidChunkIndex, Payload, IncludeModTag ? GetModificationTagFromRawHash(CidChunkHashes[Index]) : 0); - }, - OptionalWorkerPool, - LargeSizeLimit); + AbortFlag.store(true); + ZEN_WARN("Failed iterating oplog chunks in {}. Reason: '{}'", m_BasePath, Ex.what()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); - return Result.load(); + return !AbortFlag; } else { @@ -3133,7 +3135,7 @@ ProjectStore::Project::~Project() bool ProjectStore::Project::Exists(const std::filesystem::path& BasePath) { - return std::filesystem::exists(BasePath / "Project.zcb"); + return IsFile(BasePath / "Project.zcb"); } void @@ -3207,7 +3209,7 @@ ProjectStore::Project::ReadAccessTimes() using namespace std::literals; std::filesystem::path ProjectAccessTimesFilePath = m_OplogStoragePath / "AccessTimes.zcb"sv; - if (!std::filesystem::exists(ProjectAccessTimesFilePath)) + if (!IsFile(ProjectAccessTimesFilePath)) { return; } @@ -3598,14 +3600,14 @@ ProjectStore::Project::TotalSize(const std::filesystem::path& BasePath) uint64_t Size = 0; std::filesystem::path AccessTimesFilePath = BasePath / "AccessTimes.zcb"sv; - if (std::filesystem::exists(AccessTimesFilePath)) + if (IsFile(AccessTimesFilePath)) { - Size += std::filesystem::file_size(AccessTimesFilePath); + Size += FileSizeFromPath(AccessTimesFilePath); } std::filesystem::path ProjectFilePath = BasePath / "Project.zcb"sv; - if (std::filesystem::exists(ProjectFilePath)) + if (IsFile(ProjectFilePath)) { - Size += std::filesystem::file_size(ProjectFilePath); + Size += FileSizeFromPath(ProjectFilePath); } return Size; @@ -3717,7 +3719,7 @@ ProjectStore::Project::IsExpired(const std::string& EntryName, if (!MarkerPath.empty()) { std::error_code Ec; - if (std::filesystem::exists(MarkerPath, Ec)) + if (IsFile(MarkerPath, Ec)) { if (Ec) { @@ -3870,7 +3872,7 @@ void ProjectStore::DiscoverProjects() { ZEN_MEMSCOPE(GetProjectstoreTag()); - if (!std::filesystem::exists(m_ProjectBasePath)) + if (!IsDir(m_ProjectBasePath)) { return; } @@ -3919,26 +3921,32 @@ ProjectStore::Flush() } WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); - Latch WorkLatch(1); - - for (const Ref<Project>& Project : Projects) + std::atomic<bool> AbortFlag; + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); + try { - WorkLatch.AddCount(1); - WorkerPool.ScheduleWork([this, &WorkLatch, Project]() { - auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); - try - { - Project->Flush(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); - } - }); + for (const Ref<Project>& Project : Projects) + { + Work.ScheduleWork(WorkerPool, [this, Project](std::atomic<bool>&) { + try + { + Project->Flush(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("Exception while flushing project {}: {}", Project->Identifier, Ex.what()); + } + }); + } + } + catch (const std::exception& Ex) + { + AbortFlag.store(true); + ZEN_WARN("Failed projects in {}. Reason: '{}'", m_ProjectBasePath, Ex.what()); } - WorkLatch.CountDown(); - WorkLatch.Wait(); + Work.Wait(); } void @@ -3979,7 +3987,7 @@ ProjectStore::StorageSize() const GcStorageSize Result; { - if (std::filesystem::exists(m_ProjectBasePath)) + if (IsDir(m_ProjectBasePath)) { DirectoryContent ProjectsFolderContent; GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); @@ -3987,7 +3995,7 @@ ProjectStore::StorageSize() const for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) { std::filesystem::path ProjectStateFilePath = ProjectBasePath / "Project.zcb"sv; - if (std::filesystem::exists(ProjectStateFilePath)) + if (IsFile(ProjectStateFilePath)) { Result.DiskSize += Project::TotalSize(ProjectBasePath); DirectoryContent DirContent; @@ -4770,7 +4778,6 @@ std::pair<HttpResponseCode, std::string> ProjectStore::GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, - ZenContentType AcceptType, IoBuffer& OutChunk, uint64_t* OptionalInOutModificationTag) { @@ -4812,16 +4819,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId, } } - if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk)); - OutChunk = Compressed.Decompress().AsIoBuffer(); - OutChunk.SetContentType(ZenContentType::kBinary); - } - else - { - OutChunk.SetContentType(ZenContentType::kCompressedBinary); - } + OutChunk.SetContentType(ZenContentType::kCompressedBinary); return {HttpResponseCode::OK, {}}; } @@ -7253,7 +7251,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project1FilePath); + RemoveFile(Project1FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), @@ -7282,7 +7280,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project2Oplog1Path); + RemoveFile(Project2Oplog1Path); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() - std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() - std::chrono::hours(24), @@ -7311,7 +7309,7 @@ TEST_CASE("project.store.gc") CHECK(ProjectStore.OpenProject("proj2"sv)); } - std::filesystem::remove(Project2FilePath); + RemoveFile(Project2FilePath); { GcSettings Settings = {.CacheExpireTime = GcClock::Now() + std::chrono::hours(24), .ProjectStoreExpireTime = GcClock::Now() + std::chrono::hours(24), @@ -8035,7 +8033,7 @@ TEST_CASE("project.store.rpc.getchunks") CompositeBuffer Buffer = Attachment->AsCompositeBinary(); CHECK_EQ(IoHash::HashBuffer(IoBuffer(ReadFile(FilesOpIdAttachments[0].second).Flatten(), 81823, 5434)), IoHash::HashBuffer(Buffer)); - CHECK_EQ(Chunk["Size"sv].AsUInt64(), std::filesystem::file_size(FilesOpIdAttachments[0].second)); + CHECK_EQ(Chunk["Size"sv].AsUInt64(), FileSizeFromPath(FilesOpIdAttachments[0].second)); CHECK(!Chunk.FindView("RawSize")); } { @@ -8516,12 +8514,7 @@ TEST_CASE("project.store.partial.read") uint64_t ModificationTag = 0; IoBuffer Chunk; CHECK(ProjectStore - .GetChunk("proj1"sv, - "oplog1"sv, - Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), - HttpContentType::kCompressedBinary, - Chunk, - &ModificationTag) + .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), Chunk, &ModificationTag) .first == HttpResponseCode::OK); IoHash RawHash; uint64_t RawSize; @@ -8530,12 +8523,7 @@ TEST_CASE("project.store.partial.read") CHECK(ModificationTag != 0); CHECK(ProjectStore - .GetChunk("proj1"sv, - "oplog1"sv, - Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), - HttpContentType::kCompressedBinary, - Chunk, - &ModificationTag) + .GetChunk("proj1"sv, "oplog1"sv, Attachments[OpIds[1]][0].second.DecodeRawHash().ToHexString(), Chunk, &ModificationTag) .first == HttpResponseCode::NotModified); } diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index 8f2d3ce0d..368da5ea4 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -449,7 +449,6 @@ public: std::pair<HttpResponseCode, std::string> GetChunk(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view Cid, - ZenContentType AcceptType, IoBuffer& OutChunk, uint64_t* OptionalInOutModificationTag); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index a7263da83..f96b3e185 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -1212,7 +1212,7 @@ BuildContainer(CidStore& ChunkStore, { std::string_view ServerPath = View["serverpath"sv].AsString(); std::filesystem::path FilePath = Project.RootDir / ServerPath; - if (!std::filesystem::is_regular_file(FilePath)) + if (!IsFile(FilePath)) { remotestore_impl::ReportMessage( OptionalContext, @@ -3083,9 +3083,9 @@ LoadOplog(CidStore& ChunkStore, OptionalContext]() { auto _ = MakeGuard([&DechunkLatch, &TempFileName] { std::error_code Ec; - if (std::filesystem::exists(TempFileName, Ec)) + if (IsFile(TempFileName, Ec)) { - std::filesystem::remove(TempFileName, Ec); + RemoveFile(TempFileName, Ec); if (Ec) { ZEN_INFO("Failed to remove temporary file '{}'. Reason: {}", TempFileName, Ec.message()); |