diff options
73 files changed, 1533 insertions, 1340 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e328b049..38b152f8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,21 @@ -## +## +- Bugfix: Fix parsing of workspace options in Lua config +- Bugfix: Add missing Lua option for option `--gc-projectstore-duration-seconds` +- Bugfix: Add missing Lua mapping option to `--statsd` command line option +- Bugfix: Verify that chunking is allowed before chunking loose files duriung oplog export +- Bugfix: Fix oplog target url for oplog export to remote zenserver +- Bugfix: Handle workspace share paths enclosed in quotes and ending with backslash UE-231677 + +## 5.5.17 +- Improvement: Batch fetch record attachments when appropriate +- Improvement: Reduce memory buffer allocation in BlockStore::IterateBlock +- Improvement: Tweaked BlockStore::IterateBlock logic when to use threaded work (at least 4 chunks requested) +- Improvement: Remove overhead of verifying oplog presence on disk for "getchunks" rpc call in project store +- Improvement: Increase limit where we switch from simple read of IoBuffer file to mmap +- Bugfix: CasContainerStrategy::IterateChunks could give wrong payload/index when requesting 1 or 2 chunks +- Bugfix: Suppress progress report callback if oplog import detects oplog with zero ops + +## 5.5.16 - Feature: Project store "getchunks" rpc call `/prj/{project}/oplog/{log}/rpc` extended to accept both CAS (RawHash) and Id (Oid) identifiers as well as partial ranges - Legacy call still has and `chunks` array in the request body with IoHash entries providing CAS data for whole chunks only - New call has a top level `Request` object in request body with the following elements: diff --git a/VERSION.txt b/VERSION.txt index 490f8f9f3..32bde8401 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -5.5.16 +5.5.17 diff --git a/src/zen/cmds/admin_cmd.cpp b/src/zen/cmds/admin_cmd.cpp index e07e28f54..995ed4136 100644 --- a/src/zen/cmds/admin_cmd.cpp +++ b/src/zen/cmds/admin_cmd.cpp @@ -1,12 +1,13 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "admin_cmd.h" + +#include <zencore/basicfile.h> #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -736,14 +737,14 @@ CopyStateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // Copy cache state DirectoryContent CacheDirectoryContent; - GetDirectoryContent(CachePath, DirectoryContent::IncludeDirsFlag, CacheDirectoryContent); + GetDirectoryContent(CachePath, DirectoryContentFlags::IncludeDirs, CacheDirectoryContent); for (const std::filesystem::path& NamespacePath : CacheDirectoryContent.Directories) { std::filesystem::path NamespaceName = NamespacePath.filename(); std::filesystem::path TargetNamespacePath = TargetCachePath / NamespaceName; DirectoryContent CacheNamespaceContent; - GetDirectoryContent(NamespacePath, DirectoryContent::IncludeDirsFlag, CacheNamespaceContent); + GetDirectoryContent(NamespacePath, DirectoryContentFlags::IncludeDirs, CacheNamespaceContent); for (const std::filesystem::path& BucketPath : CacheNamespaceContent.Directories) { diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index 00099cebc..6ec6a80db 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -12,8 +12,8 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> +#include <zenhttp/packageformat.h> #include <zenutil/cache/cacherequests.h> -#include <zenutil/packageformat.h> #include <memory> #include <random> diff --git a/src/zen/cmds/copy_cmd.cpp b/src/zen/cmds/copy_cmd.cpp index f39bfa71c..d42d3c107 100644 --- a/src/zen/cmds/copy_cmd.cpp +++ b/src/zen/cmds/copy_cmd.cpp @@ -120,7 +120,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { } - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override { ZEN_UNUSED(FileSize); std::error_code Ec; @@ -157,7 +157,7 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } } - virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return true; } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } std::filesystem::path BasePath; std::filesystem::path TargetPath; diff --git a/src/zen/cmds/print_cmd.cpp b/src/zen/cmds/print_cmd.cpp index fa42a3563..469dddf55 100644 --- a/src/zen/cmds/print_cmd.cpp +++ b/src/zen/cmds/print_cmd.cpp @@ -8,7 +8,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/string.h> -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> using namespace std::literals; diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 5e18a3624..6bc499f03 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -2,6 +2,7 @@ #include "projectstore_cmd.h" +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compress.h> #include <zencore/filesystem.h> @@ -14,7 +15,6 @@ #include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpcommon.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> @@ -1047,15 +1047,16 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg TargetUrlBase = fmt::format("http://{}", TargetUrlBase); } + HttpClient TargetHttp(TargetUrlBase); std::string Url = fmt::format("/prj/{}/oplog/{}", m_ZenProjectName, m_ZenOplogName); bool CreateOplog = false; - if (HttpClient::Response Result = Http.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) + if (HttpClient::Response Result = TargetHttp.Get(Url, HttpClient::Accept(ZenContentType::kJSON))) { if (m_ZenClean) { ZEN_WARN("Deleting zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName) - Result = Http.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); + Result = TargetHttp.Delete(Url, HttpClient::Accept(ZenContentType::kJSON)); if (!Result) { Result.ThrowError("failed deleting existing zen remote oplog"sv); @@ -1077,7 +1078,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg if (CreateOplog) { ZEN_WARN("Creating zen remote oplog '{}/{}'", m_ZenProjectName, m_ZenOplogName); - if (HttpClient::Response Result = Http.Post(Url); !Result) + if (HttpClient::Response Result = TargetHttp.Post(Url); !Result) { Result.ThrowError("failed creating zen remote oplog"sv); return 1; diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index d66adfb81..5b88a1f73 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -13,8 +13,8 @@ #include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenhttp/packageformat.h> #include <zenutil/cache/rpcrecording.h> -#include <zenutil/packageformat.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> diff --git a/src/zen/cmds/serve_cmd.cpp b/src/zen/cmds/serve_cmd.cpp index ea9102b28..8e36e74ce 100644 --- a/src/zen/cmds/serve_cmd.cpp +++ b/src/zen/cmds/serve_cmd.cpp @@ -120,7 +120,7 @@ ServeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) struct FsVisitor : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override { std::filesystem::path ServerPath = std::filesystem::relative(Parent / File, RootPath); std::string ServerPathString = reinterpret_cast<const char*>(ServerPath.generic_u8string().c_str()); @@ -133,7 +133,7 @@ ServeCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Files.emplace_back(FileEntry{ServerPathString, ServerPathString, FileSize}); } - virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return true; } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } struct FileEntry { diff --git a/src/zen/cmds/workspaces_cmd.cpp b/src/zen/cmds/workspaces_cmd.cpp index 2c8fe43f6..05d3c573f 100644 --- a/src/zen/cmds/workspaces_cmd.cpp +++ b/src/zen/cmds/workspaces_cmd.cpp @@ -22,11 +22,23 @@ namespace zen { namespace { static void RemoveTrailingPathSeparator(std::filesystem::path& Path) { - std::u8string PathString = Path.u8string(); - if (PathString.ends_with(std::filesystem::path::preferred_separator)) + if (!Path.empty()) { - PathString.pop_back(); - Path = std::filesystem::path(PathString); + std::u8string PathString = Path.u8string(); + if (PathString.ends_with(std::filesystem::path::preferred_separator)) + { + PathString.pop_back(); + Path = std::filesystem::path(PathString); + } + // Special case if user gives a path with quotes and includes a backslash at the end: + // ="path\" cxxopts strips the leading quote only but not the trailing. + // As we expect paths here and we don't want trailing slashes we strip away the quote + // manually if the string does not start with a quote UE-231677 + else if (PathString[0] != '\"' && PathString[PathString.length() - 1] == '\"') + { + PathString.pop_back(); + Path = std::filesystem::path(PathString); + } } } diff --git a/src/zenutil/basicfile.cpp b/src/zencore/basicfile.cpp index 391c150c6..c2a21ae90 100644 --- a/src/zenutil/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "zenutil/basicfile.h" +#include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/except.h> diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 52f2c4adc..b8c35212f 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -9,9 +9,11 @@ #include <zencore/logging.h> #include <zencore/memory/memory.h> #include <zencore/process.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/testing.h> +#include <zencore/workthreadpool.h> #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> @@ -681,7 +683,7 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop { } - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override { std::error_code Ec; const std::filesystem::path Relative = std::filesystem::relative(Parent, BasePath, Ec); @@ -727,7 +729,7 @@ CopyTree(std::filesystem::path FromPath, std::filesystem::path ToPath, const Cop } } - virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return true; } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } std::filesystem::path BasePath; std::filesystem::path TargetPath; @@ -1215,7 +1217,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr } else { - const bool ShouldDescend = Visitor.VisitDirectory(RootDir, FileName); + const bool ShouldDescend = Visitor.VisitDirectory(RootDir, FileName, gsl::narrow<uint32_t>(DirInfo->FileAttributes)); if (ShouldDescend) { @@ -1234,7 +1236,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr } else { - Visitor.VisitFile(RootDir, FileName, DirInfo->EndOfFile.QuadPart); + Visitor.VisitFile(RootDir, FileName, DirInfo->EndOfFile.QuadPart, gsl::narrow<uint32_t>(DirInfo->FileAttributes)); } const uint64_t NextOffset = DirInfo->NextEntryOffset; @@ -1276,14 +1278,14 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr { /* nop */ } - else if (Visitor.VisitDirectory(RootDir, FileName)) + else if (Visitor.VisitDirectory(RootDir, FileName, gsl::narrow<uint32_t>(Stat.st_mode))) { TraverseFileSystem(FullPath, Visitor); } } else if (S_ISREG(Stat.st_mode)) { - Visitor.VisitFile(RootDir, FileName, Stat.st_size); + Visitor.VisitFile(RootDir, FileName, Stat.st_size, gsl::narrow<uint32_t>(Stat.st_mode)); } else { @@ -1527,39 +1529,163 @@ MaximizeOpenFileCount() } void -GetDirectoryContent(const std::filesystem::path& RootDir, uint8_t Flags, DirectoryContent& OutContent) +GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags Flags, DirectoryContent& OutContent) { + ZEN_ASSERT(EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs)); + ZEN_ASSERT(EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles) + ? true + : (!EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFileSizes))); + FileSystemTraversal Traversal; struct Visitor : public FileSystemTraversal::TreeVisitor { - Visitor(uint8_t Flags, DirectoryContent& OutContent) : Flags(Flags), Content(OutContent) {} + Visitor(DirectoryContentFlags Flags, DirectoryContent& OutContent) : Flags(Flags), Content(OutContent) {} - virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& File, - [[maybe_unused]] uint64_t FileSize) override + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes) override { - if (Flags & DirectoryContent::IncludeFilesFlag) + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles)) { Content.Files.push_back(Parent / File); + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFileSizes)) + { + Content.FileSizes.push_back(FileSize); + } + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeAttributes)) + { + Content.FileAttributes.push_back(NativeModeOrAttributes); + } } } - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + const path_view& DirectoryName, + uint32_t NativeModeOrAttributes) override { - if (Flags & DirectoryContent::IncludeDirsFlag) + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeDirs)) { Content.Directories.push_back(Parent / DirectoryName); + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeAttributes)) + { + Content.DirectoryAttributes.push_back(NativeModeOrAttributes); + } } - return (Flags & DirectoryContent::RecursiveFlag) != 0; + return EnumHasAnyFlags(Flags, DirectoryContentFlags::Recursive); } - const uint8_t Flags; - DirectoryContent& Content; + const DirectoryContentFlags Flags; + DirectoryContent& Content; } Visit(Flags, OutContent); Traversal.TraverseFileSystem(RootDir, Visit); } +void +GetDirectoryContent(const std::filesystem::path& RootDir, + DirectoryContentFlags Flags, + GetDirectoryContentVisitor& Visitor, + WorkerThreadPool& WorkerPool, + Latch& PendingWorkCount) +{ + ZEN_ASSERT(EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs)); + ZEN_ASSERT(EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles) + ? true + : (!EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFileSizes))); + + struct MultithreadedVisitor : public FileSystemTraversal::TreeVisitor + { + MultithreadedVisitor(WorkerThreadPool& InWorkerPool, + Latch& InOutPendingWorkCount, + const std::filesystem::path& InRelativeRoot, + DirectoryContentFlags InFlags, + GetDirectoryContentVisitor* InVisitor) + : WorkerPool(InWorkerPool) + , PendingWorkCount(InOutPendingWorkCount) + , RelativeRoot(InRelativeRoot) + , Flags(InFlags) + , Visitor(InVisitor) + { + } + + virtual void VisitFile(const std::filesystem::path&, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes) override + { + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFiles)) + { + Content.FileNames.push_back(File); + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeFileSizes)) + { + Content.FileSizes.push_back(FileSize); + } + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeAttributes)) + { + Content.FileAttributes.push_back(NativeModeOrAttributes); + } + } + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, + const path_view& DirectoryName, + uint32_t NativeModeOrAttributes) override + { + std::filesystem::path Path = Parent / DirectoryName; + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeDirs)) + { + Content.DirectoryNames.push_back(DirectoryName); + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::IncludeAttributes)) + { + Content.DirectoryAttributes.push_back(NativeModeOrAttributes); + } + } + if (EnumHasAnyFlags(Flags, DirectoryContentFlags::Recursive)) + { + PendingWorkCount.AddCount(1); + try + { + WorkerPool.ScheduleWork([WorkerPool = &WorkerPool, + PendingWorkCount = &PendingWorkCount, + Visitor = Visitor, + Flags = Flags, + Path = std::move(Path), + RelativeRoot = RelativeRoot / DirectoryName]() { + ZEN_ASSERT(Visitor); + auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); }); + { + MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor); + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(Path, SubVisitor); + Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content)); + } + }); + } + catch (const std::exception Ex) + { + ZEN_ERROR("Failed scheduling work to scan folder '{}'. Reason: '{}'", Path, Ex.what()); + PendingWorkCount.CountDown(); + } + } + return false; + } + + WorkerThreadPool& WorkerPool; + Latch& PendingWorkCount; + + const std::filesystem::path RelativeRoot; + const DirectoryContentFlags Flags; + + GetDirectoryContentVisitor::DirectoryContent Content; + GetDirectoryContentVisitor* Visitor; + } WrapperVisitor(WorkerPool, PendingWorkCount, {}, Flags, &Visitor); + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(RootDir, WrapperVisitor); + Visitor.AsyncVisitDirectory(WrapperVisitor.RelativeRoot, std::move(WrapperVisitor.Content)); +} + std::string GetEnvVariable(std::string_view VariableName) { @@ -1802,12 +1928,12 @@ TEST_CASE("filesystem") // Traversal struct : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t, uint32_t) override { bFoundExpected |= std::filesystem::equivalent(Parent / File, Expected); } - virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return true; } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } bool bFoundExpected = false; std::filesystem::path Expected; diff --git a/src/zenutil/include/zenutil/basicfile.h b/src/zencore/include/zencore/basicfile.h index 03c5605df..03c5605df 100644 --- a/src/zenutil/include/zenutil/basicfile.h +++ b/src/zencore/include/zencore/basicfile.h diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index dba4981f0..ca8682cd7 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -4,6 +4,7 @@ #include "zencore.h" +#include <zencore/enumflags.h> #include <zencore/iobuffer.h> #include <zencore/string.h> @@ -12,8 +13,10 @@ namespace zen { -class IoBuffer; class CompositeBuffer; +class IoBuffer; +class Latch; +class WorkerThreadPool; /** Delete directory (after deleting any contents) */ @@ -195,28 +198,65 @@ class FileSystemTraversal public: struct TreeVisitor { - using path_view = std::basic_string_view<std::filesystem::path::value_type>; - using path_string = std::filesystem::path::string_type; + using path_view = std::basic_string_view<std::filesystem::path::value_type>; - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) = 0; + virtual void VisitFile(const std::filesystem::path& Parent, + const path_view& File, + uint64_t FileSize, + uint32_t NativeModeOrAttributes) = 0; // This should return true if we should recurse into the directory - virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName) = 0; + virtual bool VisitDirectory(const std::filesystem::path& Parent, + const path_view& DirectoryName, + uint32_t NativeModeOrAttributes) = 0; }; void TraverseFileSystem(const std::filesystem::path& RootDir, TreeVisitor& Visitor); }; +enum class DirectoryContentFlags : uint8_t +{ + None = 0, + IncludeDirs = 1u << 0, + IncludeFiles = 1u << 1, + Recursive = 1u << 2, + IncludeFileSizes = 1u << 3, + IncludeAttributes = 1u << 4, + IncludeAllEntries = IncludeDirs | IncludeFiles | Recursive +}; + +ENUM_CLASS_FLAGS(DirectoryContentFlags) + struct DirectoryContent { - static const uint8_t IncludeDirsFlag = 1u << 0; - static const uint8_t IncludeFilesFlag = 1u << 1; - static const uint8_t RecursiveFlag = 1u << 2; std::vector<std::filesystem::path> Files; + std::vector<uint64_t> FileSizes; + std::vector<uint32_t> FileAttributes; std::vector<std::filesystem::path> Directories; + std::vector<uint32_t> DirectoryAttributes; +}; + +void GetDirectoryContent(const std::filesystem::path& RootDir, DirectoryContentFlags Flags, DirectoryContent& OutContent); + +struct GetDirectoryContentVisitor +{ +public: + struct DirectoryContent + { + std::vector<std::filesystem::path> FileNames; + std::vector<uint64_t> FileSizes; + std::vector<uint32_t> FileAttributes; + std::vector<std::filesystem::path> DirectoryNames; + std::vector<uint32_t> DirectoryAttributes; + }; + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) = 0; }; -void GetDirectoryContent(const std::filesystem::path& RootDir, uint8_t Flags, DirectoryContent& OutContent); +void GetDirectoryContent(const std::filesystem::path& RootDir, + DirectoryContentFlags Flags, + GetDirectoryContentVisitor& Visitor, + WorkerThreadPool& WorkerPool, + Latch& PendingWorkCount); std::string GetEnvVariable(std::string_view VariableName); diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index 3d9d6706a..3b5c89c3e 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -289,7 +289,7 @@ IoBufferExtendedCore::Materialize() const return; } - const size_t DisableMMapSizeLimit = 0x1000ull; + const size_t DisableMMapSizeLimit = 0x2000ull; if (m_DataBytes < DisableMMapSizeLimit) { diff --git a/src/zencore/memory/llm.cpp b/src/zencore/memory/llm.cpp index fe4853d49..61fa29a66 100644 --- a/src/zencore/memory/llm.cpp +++ b/src/zencore/memory/llm.cpp @@ -15,6 +15,7 @@ static const int32_t TagNamesBaseIndex = 256; static const int32_t TrackedTagNameCount = 256; static const char* TagNames[TrackedTagNameCount]; static uint32_t TagNameHashes[TrackedTagNameCount]; +static int32_t ParentTags[TrackedTagNameCount]; static RwLock TableLock; @@ -46,7 +47,7 @@ FLLMTag::AssignAndAnnounceNewTag(const char* TagName) for (int TagIndex = 0; TagIndex <= CurrentMaxTagIndex; ++TagIndex) { - if (TagNameHashes[TagIndex] == TagNameHash) + if (TagNameHashes[TagIndex] == TagNameHash && ParentTags[TagIndex] == m_ParentTag) { m_Tag = TagIndex + TagNamesBaseIndex; // could verify the string matches here to catch hash collisions @@ -64,6 +65,7 @@ FLLMTag::AssignAndAnnounceNewTag(const char* TagName) { TagNameHashes[TagIndex] = TagNameHash; TagNames[TagIndex] = TagName; + ParentTags[TagIndex] = m_ParentTag; } else { diff --git a/src/zencore/memtrack/memorytrace.cpp b/src/zencore/memtrack/memorytrace.cpp index 7089c356a..e4ae8148e 100644 --- a/src/zencore/memtrack/memorytrace.cpp +++ b/src/zencore/memtrack/memorytrace.cpp @@ -41,8 +41,8 @@ void MemoryTrace_EnableTracePump(); //////////////////////////////////////////////////////////////////////////////// namespace { -// Controls how often time markers are emitted (default: every 4095 allocations). -constexpr uint32_t MarkerSamplePeriod = (4 << 10) - 1; +// Controls how often time markers are emitted (must be POW2-1 as this is used as a mask) +constexpr uint32_t MarkerSamplePeriod = 128 - 1; // Number of shifted bits to SizeLower constexpr uint32_t SizeShift = 3; diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index b1da034d2..c51e8f69d 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -1012,7 +1012,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand #if ZEN_PLATFORM_LINUX std::vector<uint32_t> RunningPids; DirectoryContent ProcList; - GetDirectoryContent("/proc", DirectoryContent::IncludeDirsFlag, ProcList); + GetDirectoryContent("/proc", DirectoryContentFlags::IncludeDirs, ProcList); for (const std::filesystem::path& EntryPath : ProcList.Directories) { std::string EntryName = EntryPath.stem(); diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp index db821bff8..82d28c0e3 100644 --- a/src/zencore/zencore.cpp +++ b/src/zencore/zencore.cpp @@ -7,6 +7,7 @@ #endif #include <zencore/assertfmt.h> +#include <zencore/basicfile.h> #include <zencore/blake3.h> #include <zencore/callstack.h> #include <zencore/compactbinary.h> @@ -241,6 +242,7 @@ ApplicationExitCode() void zencore_forcelinktests() { + zen::basicfile_forcelink(); zen::blake3_forcelink(); zen::callstack_forcelink(); zen::compositebuffer_forcelink(); diff --git a/src/zenhttp/auth/authmgr.cpp b/src/zenhttp/auth/authmgr.cpp index 8da676908..1a9892d5c 100644 --- a/src/zenhttp/auth/authmgr.cpp +++ b/src/zenhttp/auth/authmgr.cpp @@ -2,6 +2,7 @@ #include "zenhttp/auth/authmgr.h" +#include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> @@ -9,7 +10,6 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zenhttp/auth/oidc.h> -#include <zenutil/basicfile.h> #include <condition_variable> #include <memory> diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index d8ce25304..8052a8fd5 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -1,7 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> #include <zenhttp/httpserver.h> +#include <zenhttp/packageformat.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -16,8 +18,6 @@ #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/trace.h> -#include <zenhttp/formatters.h> -#include <zenutil/packageformat.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp new file mode 100644 index 000000000..04ac2ad3f --- /dev/null +++ b/src/zenhttp/httpclientauth.cpp @@ -0,0 +1,76 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenhttp/httpclientauth.h> + +#include <zenhttp/auth/authmgr.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <fmt/format.h> +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { namespace httpclientauth { + + using namespace std::literals; + + std::function<HttpClientAccessToken()> CreateFromStaticToken(HttpClientAccessToken Token) + { + return [Token]() { return Token; }; + } + + std::function<HttpClientAccessToken()> CreateFromStaticToken(std::string_view Token) + { + return CreateFromStaticToken( + HttpClientAccessToken{.Value = fmt::format("Bearer {}"sv, Token), .ExpireTime = HttpClientAccessToken::TimePoint::max()}); + } + + std::function<HttpClientAccessToken()> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) + { + OAuthClientCredentialsParams OAuthParams(Params); + return [OAuthParams]() { + using namespace std::chrono; + + std::string Body = fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}"sv, + OAuthParams.ClientId, + OAuthParams.ClientSecret); + + cpr::Response Response = cpr::Post(cpr::Url{OAuthParams.Url}, + cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, + cpr::Body{std::move(Body)}); + + if (Response.error || Response.status_code != 200) + { + return HttpClientAccessToken{}; + } + + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.text, JsonError); + + if (JsonError.empty() == false) + { + return HttpClientAccessToken{}; + } + + std::string Token = Json["access_token"].string_value(); + int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); + HttpClientAccessToken::TimePoint ExpireTime = HttpClientAccessToken::Clock::now() + seconds(ExpiresInSeconds); + + return HttpClientAccessToken{.Value = fmt::format("Bearer {}"sv, Token), .ExpireTime = ExpireTime}; + }; + } + + std::function<HttpClientAccessToken()> CreateFromOpenIdProvider(AuthMgr& AuthManager, std::string_view OpenIdProvider) + { + return [&AuthManager = AuthManager, OpenIdProvider = std::string(OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider); + return HttpClientAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }; + } + + std::function<HttpClientAccessToken()> CreateFromDefaultOpenIdProvider(AuthMgr& AuthManager) + { + return CreateFromOpenIdProvider(AuthManager, "Default"sv); + } + +}} // namespace zen::httpclientauth diff --git a/src/zenhttp/httpserver.cpp b/src/zenhttp/httpserver.cpp index a0d4ef3f3..1fbe22628 100644 --- a/src/zenhttp/httpserver.cpp +++ b/src/zenhttp/httpserver.cpp @@ -24,7 +24,7 @@ #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/thread.h> -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> #include <charconv> #include <mutex> diff --git a/src/zenhttp/include/zenhttp/httpclientauth.h b/src/zenhttp/include/zenhttp/httpclientauth.h new file mode 100644 index 000000000..aa07620ca --- /dev/null +++ b/src/zenhttp/include/zenhttp/httpclientauth.h @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpclient.h> + +namespace zen { + +class AuthMgr; + +namespace httpclientauth { + std::function<HttpClientAccessToken()> CreateFromStaticToken(HttpClientAccessToken Token); + + std::function<HttpClientAccessToken()> CreateFromStaticToken(std::string_view Token); + + struct OAuthClientCredentialsParams + { + std::string_view Url; + std::string_view ClientId; + std::string_view ClientSecret; + }; + + std::function<HttpClientAccessToken()> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params); + + std::function<HttpClientAccessToken()> CreateFromOpenIdProvider(AuthMgr& AuthManager, std::string_view OpenIdProvider); + std::function<HttpClientAccessToken()> CreateFromDefaultOpenIdProvider(AuthMgr& AuthManager); +} // namespace httpclientauth + +} // namespace zen diff --git a/src/zenutil/include/zenutil/packageformat.h b/src/zenhttp/include/zenhttp/packageformat.h index c90b840da..c90b840da 100644 --- a/src/zenutil/include/zenutil/packageformat.h +++ b/src/zenhttp/include/zenhttp/packageformat.h diff --git a/src/zenutil/packageformat.cpp b/src/zenhttp/packageformat.cpp index 579e0d13c..676fc73fd 100644 --- a/src/zenutil/packageformat.cpp +++ b/src/zenhttp/packageformat.cpp @@ -1,6 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index c2a823430..fe59e3a6f 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -522,7 +522,9 @@ HttpServerConnection::HandleRequest() } else { - ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what()); + ZEN_ERROR("Caught system error exception while handling request: {}. ({})", + SystemError.what(), + SystemError.code().value()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } diff --git a/src/zenhttp/servers/httpparser.cpp b/src/zenhttp/servers/httpparser.cpp index 6829faa4a..9bb354a5e 100644 --- a/src/zenhttp/servers/httpparser.cpp +++ b/src/zenhttp/servers/httpparser.cpp @@ -389,7 +389,7 @@ HttpRequestParser::OnMessageComplete() } else { - ZEN_ERROR("failed processing http request: '{}'", SystemError.what()); + ZEN_ERROR("failed processing http request: '{}' ({})", SystemError.what(), SystemError.code().value()); } ResetState(); return 1; diff --git a/src/zenhttp/servers/httpplugin.cpp b/src/zenhttp/servers/httpplugin.cpp index b55d80af8..ec12b3755 100644 --- a/src/zenhttp/servers/httpplugin.cpp +++ b/src/zenhttp/servers/httpplugin.cpp @@ -407,7 +407,9 @@ HttpPluginConnectionHandler::HandleRequest() } else { - ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what()); + ZEN_ERROR("Caught system error exception while handling request: {}. ({})", + SystemError.what(), + SystemError.code().value()); Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, SystemError.what()); } } diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 54308a00b..87128c0c9 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -14,7 +14,7 @@ #include <zencore/string.h> #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> #if ZEN_WITH_HTTPSYS # define _WINSOCKAPI_ @@ -2047,7 +2047,7 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InsufficientStorage, SystemError.what()); } - ZEN_ERROR("Caught system error exception while handling request: {}", SystemError.what()); + ZEN_ERROR("Caught system error exception while handling request: {}. ({})", SystemError.what(), SystemError.code().value()); return new HttpMessageResponseRequest(Transaction(), (uint16_t)HttpResponseCode::InternalServerError, SystemError.what()); } catch (const std::bad_alloc& BadAlloc) diff --git a/src/zenhttp/xmake.lua b/src/zenhttp/xmake.lua index 8393f399b..b6ffbe467 100644 --- a/src/zenhttp/xmake.lua +++ b/src/zenhttp/xmake.lua @@ -7,7 +7,7 @@ target('zenhttp') add_files("**.cpp") add_files("servers/httpsys.cpp", {unity_ignored=true}) add_includedirs("include", {public=true}) - add_deps("zencore", "zenutil", "transport-sdk") + add_deps("zencore", "transport-sdk") add_packages( "vcpkg::asio", "vcpkg::cpr", diff --git a/src/zenhttp/zenhttp.cpp b/src/zenhttp/zenhttp.cpp index 6b855c4db..a2679f92e 100644 --- a/src/zenhttp/zenhttp.cpp +++ b/src/zenhttp/zenhttp.cpp @@ -6,7 +6,7 @@ # include <zenhttp/httpclient.h> # include <zenhttp/httpserver.h> -# include <zenutil/packageformat.h> +# include <zenhttp/packageformat.h> namespace zen { @@ -15,6 +15,7 @@ zenhttp_forcelinktests() { http_forcelink(); httpclient_forcelink(); + forcelink_packageformat(); } } // namespace zen diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index ec288f1dc..6259c0f37 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -20,12 +20,12 @@ #include <zencore/timer.h> #include <zencore/xxhash.h> #include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> #include <zenhttp/zenhttp.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/chunkrequests.h> #include <zenutil/logging/testformatter.h> -#include <zenutil/packageformat.h> #include <zenutil/zenserverprocess.h> #include <http_parser.h> diff --git a/src/zenserver/admin/admin.cpp b/src/zenserver/admin/admin.cpp index 847ed5a50..2888f5450 100644 --- a/src/zenserver/admin/admin.cpp +++ b/src/zenserver/admin/admin.cpp @@ -21,6 +21,7 @@ #include <zenstore/gc.h> #include <zenstore/cache/structuredcachestore.h> +#include <zenutil/workerpools.h> #include "config.h" #include "projectstore/projectstore.h" @@ -41,31 +42,43 @@ GetStatsForDirectory(std::filesystem::path Dir) if (!std::filesystem::exists(Dir)) return {}; - FileSystemTraversal Traversal; - - struct StatsTraversal : public FileSystemTraversal::TreeVisitor + struct StatsTraversal : public GetDirectoryContentVisitor { - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override { - ZEN_UNUSED(Parent, File); - ++TotalFileCount; - TotalBytes += FileSize; + ZEN_UNUSED(RelativeRoot); + + uint64_t FileCount = Content.FileNames.size(); + uint64_t DirCount = Content.DirectoryNames.size(); + uint64_t FilesSize = 0; + for (uint64_t FileSize : Content.FileSizes) + { + FilesSize += FileSize; + } + TotalBytes += FilesSize; + TotalFileCount += FileCount; + TotalDirCount += DirCount; } - virtual bool VisitDirectory(const std::filesystem::path&, const path_view&) override + + std::atomic_uint64_t TotalBytes = 0; + std::atomic_uint64_t TotalFileCount = 0; + std::atomic_uint64_t TotalDirCount = 0; + + DirStats GetStats() { - ++TotalDirCount; - return true; + return {.FileCount = TotalFileCount.load(), .DirCount = TotalDirCount.load(), .ByteCount = TotalBytes.load()}; } + } DirTraverser; - uint64_t TotalBytes = 0; - uint64_t TotalFileCount = 0; - uint64_t TotalDirCount = 0; - - DirStats GetStats() { return {.FileCount = TotalFileCount, .DirCount = TotalDirCount, .ByteCount = TotalBytes}; } - }; + Latch PendingWorkCount(1); - StatsTraversal DirTraverser; - Traversal.TraverseFileSystem(Dir, DirTraverser); + GetDirectoryContent(Dir, + DirectoryContentFlags::IncludeAllEntries | DirectoryContentFlags::IncludeFileSizes, + DirTraverser, + GetSmallWorkerPool(EWorkloadType::Burst), + PendingWorkCount); + PendingWorkCount.CountDown(); + PendingWorkCount.Wait(); return DirTraverser.GetStats(); } diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index 925c7b42d..b9a9ca380 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -18,15 +18,15 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/gc.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/cacherequests.h> #include <zenutil/cache/rpcrecording.h> -#include <zenutil/packageformat.h> +#include <zenutil/jupiter/jupiterclient.h> #include <zenutil/workerpools.h> -#include "upstream/jupiter.h" #include "upstream/upstreamcache.h" #include "upstream/zen.h" #include "zenstore/cidstore.h" diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 0108e8b9f..809092378 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -5,6 +5,7 @@ #include "config/luaconfig.h" #include "diag/logging.h" +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/crypto.h> @@ -14,7 +15,6 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zenhttp/zenhttp.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> @@ -63,7 +63,7 @@ ReadAllCentralManifests(const std::filesystem::path& SystemRoot) std::vector<CbObject> Manifests; DirectoryContent Content; - GetDirectoryContent(SystemRoot / "States", DirectoryContent::IncludeFilesFlag, Content); + GetDirectoryContent(SystemRoot / "States", DirectoryContentFlags::IncludeFiles, Content); for (std::filesystem::path& File : Content.Files) { @@ -400,25 +400,25 @@ ParseConfigFile(const std::filesystem::path& Path, #endif ////// stats - LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled); + LuaOptions.AddOption("stats.enable"sv, ServerOptions.StatsConfig.Enabled, "statsd"sv); LuaOptions.AddOption("stats.host"sv, ServerOptions.StatsConfig.StatsdHost); LuaOptions.AddOption("stats.port"sv, ServerOptions.StatsConfig.StatsdPort); ////// cache LuaOptions.AddOption("cache.enable"sv, ServerOptions.StructuredCacheConfig.Enabled); - LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"); - LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"); + LuaOptions.AddOption("cache.writelog"sv, ServerOptions.StructuredCacheConfig.WriteLogEnabled, "cache-write-log"sv); + LuaOptions.AddOption("cache.accesslog"sv, ServerOptions.StructuredCacheConfig.AccessLogEnabled, "cache-access-log"sv); LuaOptions.AddOption("cache.memlayer.sizethreshold"sv, ServerOptions.StructuredCacheConfig.MemCacheSizeThreshold, - "cache-memlayer-sizethreshold"); + "cache-memlayer-sizethreshold"sv); LuaOptions.AddOption("cache.memlayer.targetfootprint"sv, ServerOptions.StructuredCacheConfig.MemTargetFootprintBytes, - "cache-memlayer-targetfootprint"); + "cache-memlayer-targetfootprint"sv); LuaOptions.AddOption("cache.memlayer.triminterval"sv, ServerOptions.StructuredCacheConfig.MemTrimIntervalSeconds, - "cache-memlayer-triminterval"); - LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage"); + "cache-memlayer-triminterval"sv); + LuaOptions.AddOption("cache.memlayer.maxage"sv, ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds, "cache-memlayer-maxage"sv); ////// cache.upstream LuaOptions.AddOption("cache.upstream.policy"sv, ServerOptions.UpstreamCacheConfig.CachePolicy, "upstream-cache-policy"sv); @@ -461,6 +461,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("cache.upstream.zen.dns"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Dns); LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls); + ////// gc LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv); LuaOptions.AddOption("gc.v2"sv, ServerOptions.GcConfig.UseGCV2, "gc-v2"sv); @@ -487,22 +488,24 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("gc.attachment.passes"sv, ServerOptions.GcConfig.AttachmentPassCount, "gc-attachment-passes"sv); LuaOptions.AddOption("gc.validation"sv, ServerOptions.GcConfig.EnableValidation, "gc-validation"); - ////// gc LuaOptions.AddOption("gc.cache.maxdurationseconds"sv, ServerOptions.GcConfig.Cache.MaxDurationSeconds, "gc-cache-duration-seconds"sv); + LuaOptions.AddOption("gc.projectstore.duration.seconds"sv, + ServerOptions.GcConfig.ProjectStore.MaxDurationSeconds, + "gc-projectstore-duration-seconds"); ////// security LuaOptions.AddOption("security.encryptionaeskey"sv, ServerOptions.EncryptionKey, "encryption-aes-key"sv); LuaOptions.AddOption("security.encryptionaesiv"sv, ServerOptions.EncryptionIV, "encryption-aes-iv"sv); LuaOptions.AddOption("security.openidproviders"sv, ServerOptions.AuthConfig); - LuaOptions.Parse(Path, CmdLineResult); - ////// workspaces LuaOptions.AddOption("workspaces.enabled"sv, ServerOptions.WorksSpacesConfig.Enabled, "workspaces-enabled"sv); LuaOptions.AddOption("workspaces.allowconfigchanges"sv, ServerOptions.WorksSpacesConfig.AllowConfigurationChanges, "workspaces-allow-changes"sv); + LuaOptions.Parse(Path, CmdLineResult); + // These have special command line processing so we make sure we export them if they were configured on command line if (!ServerOptions.AuthConfig.OpenIdProviders.empty()) { diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index e614b256b..b0212ab07 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -3,6 +3,7 @@ #include <objectstore/objectstore.h> #include <zencore/base64.h> +#include <zencore/basicfile.h> #include <zencore/compactbinaryvalue.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -12,7 +13,6 @@ #include "zencore/compactbinarybuilder.h" #include "zenhttp/httpcommon.h" #include "zenhttp/httpserver.h" -#include "zenutil/basicfile.h" #include <filesystem> #include <thread> @@ -376,7 +376,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Writer.BeginArray("Contents"sv); } - void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize) override + void VisitFile(const fs::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override { const fs::path FullPath = Parent / fs::path(File); fs::path RelativePath = fs::relative(FullPath, BucketPath); @@ -390,7 +390,7 @@ HttpObjectStoreService::ListBucket(zen::HttpRouterRequest& Request, const std::s Writer.EndObject(); } - bool VisitDirectory(const std::filesystem::path&, const path_view&) override { return false; } + bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return false; } CbObject GetResult() { diff --git a/src/zenserver/projectstore/buildsremoteprojectstore.cpp b/src/zenserver/projectstore/buildsremoteprojectstore.cpp index 6d0d51a60..302b81729 100644 --- a/src/zenserver/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenserver/projectstore/buildsremoteprojectstore.cpp @@ -6,8 +6,10 @@ #include <zencore/compress.h> #include <zencore/fmtutils.h> -#include <upstream/jupiter.h> -#include <zenhttp/auth/authmgr.h> +#include <zenhttp/httpclientauth.h> + +#include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/jupiter/jupitersession.h> namespace zen { @@ -18,7 +20,7 @@ static const std::string_view OplogContainerPartName = "oplogcontainer"sv; class BuildsRemoteStore : public RemoteProjectStore { public: - BuildsRemoteStore(Ref<CloudCacheClient>&& CloudClient, + BuildsRemoteStore(Ref<JupiterClient>&& InJupiterClient, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, @@ -26,7 +28,7 @@ public: bool ForceDisableBlocks, bool ForceDisableTempBlocks, const std::filesystem::path& TempFilePath) - : m_CloudClient(std::move(CloudClient)) + : m_JupiterClient(std::move(InJupiterClient)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_BuildId(BuildId) @@ -50,7 +52,7 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), - .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_CloudClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; + .Description = fmt::format("[cloud] {} as {}/{}/{}"sv, m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId)}; } virtual Stats GetStats() const override @@ -68,18 +70,18 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); IoBuffer Payload = m_MetaData; Payload.SetContentType(ZenContentType::kCbObject); - CloudCacheResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); + JupiterResult PutResult = Session.PutBuild(m_Namespace, m_Bucket, m_BuildId, Payload); AddStats(PutResult); CreateContainerResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed creating oplog build to {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -92,7 +94,7 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); PutBuildPartResult PutResult = Session.PutBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, OplogContainerPartName, Payload); AddStats(PutResult); @@ -101,7 +103,7 @@ public: if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -115,9 +117,9 @@ public: virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&& Block) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); - CloudCacheResult PutResult = + JupiterResult PutResult = Session.PutBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, ZenContentType::kCompressedBinary, Payload); AddStats(PutResult); @@ -125,7 +127,7 @@ public: if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -176,14 +178,14 @@ public: IoBuffer MetaPayload = Writer.Save().GetBuffer().AsIoBuffer(); MetaPayload.SetContentType(ZenContentType::kCbObject); - CloudCacheResult PutMetaResult = + JupiterResult PutMetaResult = Session.PutBlockMetadata(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, MetaPayload); AddStats(PutMetaResult); RemoteProjectStore::Result MetaDataResult = ConvertResult(PutMetaResult); if (MetaDataResult.ErrorCode) { ZEN_WARN("Failed saving block attachment meta data to {}/{}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -215,7 +217,7 @@ public: ZEN_UNUSED(RawHash); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); FinalizeBuildPartResult FinalizeRefResult = Session.FinalizeBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash); AddStats(FinalizeRefResult); @@ -224,7 +226,7 @@ public: if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container build part to {}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -233,14 +235,14 @@ public: } else if (Result.Needs.empty()) { - CloudCacheResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); + JupiterResult FinalizeBuildResult = Session.FinalizeBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(FinalizeBuildResult); FinalizeBuildResult.ElapsedSeconds += FinalizeRefResult.ElapsedSeconds; Result = {ConvertResult(FinalizeBuildResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container build to {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -254,14 +256,14 @@ public: { ZEN_ASSERT(m_OplogBuildPartId == Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult GetBuildResult = Session.GetBuild(m_Namespace, m_Bucket, m_BuildId); AddStats(GetBuildResult); LoadContainerResult Result{ConvertResult(GetBuildResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog container build from {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -273,7 +275,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload is not formatted as a compact binary object"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); @@ -284,7 +286,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId); @@ -295,7 +297,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -303,7 +305,7 @@ public: return Result; } - CloudCacheResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + JupiterResult GetBuildPartResult = Session.GetBuildPart(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); AddStats(GetBuildPartResult); Result = {ConvertResult(GetBuildResult)}; Result.ElapsedSeconds += GetBuildResult.ElapsedSeconds; @@ -311,7 +313,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed fetching oplog build part from {}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -325,7 +327,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The build part for oplog container {}/{}/{}/{}/{} is not formatted as a compact binary object"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -339,15 +341,15 @@ public: virtual GetKnownBlocksResult GetKnownBlocks() override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult FindResult = Session.FindBlocks(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId); AddStats(FindResult); GetKnownBlocksResult Result{ConvertResult(FindResult)}; if (Result.ErrorCode) { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed listing know blocks for {}/{}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -360,7 +362,7 @@ public: { Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The block list {}/{}/{}/{} is not formatted as a compact binary object"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -392,15 +394,15 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult GetResult = Session.GetBuildBlob(m_Namespace, m_Bucket, m_BuildId, m_OplogBuildPartId, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}&{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_BuildId, @@ -429,7 +431,7 @@ public: } private: - void AddStats(const CloudCacheResult& Result) + void AddStats(const JupiterResult& Result) { m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); @@ -445,7 +447,7 @@ private: m_RequestCount.fetch_add(1); } - static Result ConvertResult(const CloudCacheResult& Response) + static Result ConvertResult(const JupiterResult& Response) { std::string Text; int32_t ErrorCode = 0; @@ -482,7 +484,7 @@ private: return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; } - Ref<CloudCacheClient> m_CloudClient; + Ref<JupiterClient> m_JupiterClient; const std::string m_Namespace; const std::string m_Bucket; const Oid m_BuildId; @@ -510,44 +512,35 @@ CreateBuildsRemoteStore(const BuildsRemoteStoreOptions& Options, const std::file // Assume https URL Url = fmt::format("https://{}"sv, Url); } - CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; + JupiterClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + std::function<HttpClientAccessToken()> TokenProvider; if (!Options.OpenIdProvider.empty()) { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); } else if (!Options.AccessToken.empty()) { - TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() { - return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; - }); + TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } else { - TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default"); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } - Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(CloudClient), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(std::move(Client), Options.Namespace, Options.Bucket, Options.BuildId, diff --git a/src/zenserver/projectstore/httpprojectstore.cpp b/src/zenserver/projectstore/httpprojectstore.cpp index eb6407e1f..0b8e5f13b 100644 --- a/src/zenserver/projectstore/httpprojectstore.cpp +++ b/src/zenserver/projectstore/httpprojectstore.cpp @@ -1061,12 +1061,8 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) bool IsValid = true; std::vector<IoHash> MissingChunks; - std::vector<IoHash> ReferencedChunks; CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer { - // We want to add all chunks here so we can properly clear them from the 'prep' call where we retained them earlier - ReferencedChunks.push_back(Hash); - if (m_CidStore.ContainsChunk(Hash)) { // Return null attachment as we already have it, no point in reading it and storing it again @@ -1155,6 +1151,9 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "No oplog entry key specified"); } + std::vector<IoHash> ReferencedChunks; + Core.IterateAttachments([&ReferencedChunks](CbFieldView View) { ReferencedChunks.push_back(View.AsAttachment()); }); + // Write core to oplog size_t AttachmentCount = Package.GetAttachments().size(); @@ -1168,7 +1167,10 @@ HttpProjectService::HandleOplogOpNewRequest(HttpRouterRequest& Req) m_ProjectStats.ChunkWriteCount += AttachmentCount; // Once we stored the op, we no longer need to retain any chunks this op references - FoundLog->RemovePendingChunkReferences(ReferencedChunks); + if (!ReferencedChunks.empty()) + { + FoundLog->RemovePendingChunkReferences(ReferencedChunks); + } m_ProjectStats.OpWriteCount++; ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", ProjectId, OplogId, OpLsn, NiceBytes(Payload.Size()), Core["key"sv].AsString()); diff --git a/src/zenserver/projectstore/httpprojectstore.h b/src/zenserver/projectstore/httpprojectstore.h index 13810bd66..8e74c57a5 100644 --- a/src/zenserver/projectstore/httpprojectstore.h +++ b/src/zenserver/projectstore/httpprojectstore.h @@ -3,13 +3,13 @@ #pragma once #include <zencore/stats.h> -#include <zenhttp/auth/authmgr.h> #include <zenhttp/httpserver.h> #include <zenhttp/httpstats.h> #include <zenstore/cidstore.h> namespace zen { +class AuthMgr; class ProjectStore; ////////////////////////////////////////////////////////////////////////// diff --git a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp index f4fe578ff..e906127ff 100644 --- a/src/zenserver/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenserver/projectstore/jupiterremoteprojectstore.cpp @@ -5,8 +5,10 @@ #include <zencore/compress.h> #include <zencore/fmtutils.h> -#include <upstream/jupiter.h> -#include <zenhttp/auth/authmgr.h> +#include <zenhttp/httpclientauth.h> + +#include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/jupiter/jupitersession.h> namespace zen { @@ -15,7 +17,7 @@ using namespace std::literals; class JupiterRemoteStore : public RemoteProjectStore { public: - JupiterRemoteStore(Ref<CloudCacheClient>&& CloudClient, + JupiterRemoteStore(Ref<JupiterClient>&& InJupiterClient, std::string_view Namespace, std::string_view Bucket, const IoHash& Key, @@ -23,7 +25,7 @@ public: bool ForceDisableBlocks, bool ForceDisableTempBlocks, const std::filesystem::path& TempFilePath) - : m_CloudClient(std::move(CloudClient)) + : m_JupiterClient(std::move(InJupiterClient)) , m_Namespace(Namespace) , m_Bucket(Bucket) , m_Key(Key) @@ -47,7 +49,7 @@ public: .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key), .Description = fmt::format("[cloud] {} as {}/{}/{}{}"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, @@ -73,15 +75,15 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - CloudCacheSession Session(m_CloudClient.Get()); - PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + PutRefResult PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject); AddStats(PutResult); SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, @@ -92,15 +94,15 @@ public: virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, Block&&) override { - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload); AddStats(PutResult); SaveAttachmentResult Result{ConvertResult(PutResult)}; if (Result.ErrorCode) { Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, RawHash, Result.Reason); @@ -125,7 +127,7 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override { - CloudCacheSession Session(m_CloudClient.Get()); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash); AddStats(FinalizeRefResult); @@ -133,7 +135,7 @@ public: if (Result.ErrorCode) { Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, m_Key, @@ -162,8 +164,8 @@ public: {.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds}}; } - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheExistsResult ExistsResult = + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterExistsResult ExistsResult = Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(BlockHashes.begin(), BlockHashes.end())); AddStats(ExistsResult); @@ -172,7 +174,7 @@ public: return GetKnownBlocksResult{{.ErrorCode = ExistsResult.ErrorCode, .ElapsedSeconds = LoadResult.ElapsedSeconds + ExistsResult.ElapsedSeconds, .Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, ExistsResult.Reason)}}; } @@ -201,15 +203,15 @@ public: virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); AddStats(GetResult); LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)}; if (GetResult.ErrorCode) { Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, RawHash, Result.Reason); @@ -237,14 +239,14 @@ public: private: LoadContainerResult LoadContainer(const IoHash& Key) { - CloudCacheSession Session(m_CloudClient.Get()); - CloudCacheResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client()); + JupiterResult GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject); AddStats(GetResult); if (GetResult.ErrorCode || !GetResult.Success) { LoadContainerResult Result{ConvertResult(GetResult)}; Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'", - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, Key, @@ -259,7 +261,7 @@ private: RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), .ElapsedSeconds = GetResult.ElapsedSeconds, .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv, - m_CloudClient->ServiceUrl(), + m_JupiterClient->ServiceUrl(), m_Namespace, m_Bucket, Key)}, @@ -268,7 +270,7 @@ private: return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)}; } - void AddStats(const CloudCacheResult& Result) + void AddStats(const JupiterResult& Result) { m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes)); m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes)); @@ -284,7 +286,7 @@ private: m_RequestCount.fetch_add(1); } - static Result ConvertResult(const CloudCacheResult& Response) + static Result ConvertResult(const JupiterResult& Response) { std::string Text; int32_t ErrorCode = 0; @@ -321,7 +323,7 @@ private: return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text}; } - Ref<CloudCacheClient> m_CloudClient; + Ref<JupiterClient> m_JupiterClient; const std::string m_Namespace; const std::string m_Bucket; const IoHash m_Key; @@ -348,44 +350,35 @@ CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::fi // Assume https URL Url = fmt::format("https://{}"sv, Url); } - CloudCacheClientOptions ClientOptions{.Name = "Remote store"sv, - .ServiceUrl = Url, - .ConnectTimeout = std::chrono::milliseconds(2000), - .Timeout = std::chrono::milliseconds(1800000), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 4}; + JupiterClientOptions ClientOptions{.Name = "Remote store"sv, + .ServiceUrl = Url, + .ConnectTimeout = std::chrono::milliseconds(2000), + .Timeout = std::chrono::milliseconds(1800000), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = true, + .RetryCount = 4}; // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider // 2) Access token as parameter in request // 3) Environment variable (different win vs linux/mac) // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + std::function<HttpClientAccessToken()> TokenProvider; if (!Options.OpenIdProvider.empty()) { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); } else if (!Options.AccessToken.empty()) { - TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() { - return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()}; - }); + TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); } else { - TokenProvider = CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager]() { - AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken("Default"); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); } - Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider))); + Ref<JupiterClient> Client(new JupiterClient(ClientOptions, std::move(TokenProvider))); - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient), + std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(Client), Options.Namespace, Options.Bucket, Options.Key, diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index d39d78cb9..46a236af9 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -16,12 +16,12 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/packageformat.h> #include <zenstore/caslog.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenutil/cache/rpcrecording.h> #include <zenutil/openprocesscache.h> -#include <zenutil/packageformat.h> #include <zenutil/referencemetadata.h> #include <zenutil/workerpools.h> @@ -3506,7 +3506,7 @@ ProjectStore::Project::ScanForOplogs() const if (Project::Exists(m_OplogStoragePath)) { DirectoryContent DirContent; - GetDirectoryContent(m_OplogStoragePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_OplogStoragePath, DirectoryContentFlags::IncludeDirs, DirContent); Oplogs.reserve(DirContent.Directories.size()); for (const std::filesystem::path& DirPath : DirContent.Directories) { @@ -3859,7 +3859,7 @@ ProjectStore::DiscoverProjects() } DirectoryContent DirContent; - GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& DirPath : DirContent.Directories) { @@ -3965,7 +3965,7 @@ ProjectStore::StorageSize() const if (std::filesystem::exists(m_ProjectBasePath)) { DirectoryContent ProjectsFolderContent; - GetDirectoryContent(m_ProjectBasePath, DirectoryContent::IncludeDirsFlag, ProjectsFolderContent); + GetDirectoryContent(m_ProjectBasePath, DirectoryContentFlags::IncludeDirs, ProjectsFolderContent); for (const std::filesystem::path& ProjectBasePath : ProjectsFolderContent.Directories) { @@ -3974,7 +3974,7 @@ ProjectStore::StorageSize() const { Result.DiskSize += Project::TotalSize(ProjectBasePath); DirectoryContent DirContent; - GetDirectoryContent(ProjectBasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(ProjectBasePath, DirectoryContentFlags::IncludeDirs, DirContent); for (const std::filesystem::path& OplogBasePath : DirContent.Directories) { Result.DiskSize += Oplog::TotalSize(OplogBasePath); @@ -5433,7 +5433,11 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Project->TouchProject(); - ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true); + std::string_view Method = Cb["method"sv].AsString(); + + bool VerifyPathOnDisk = Method != "getchunks"sv; + + ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, VerifyPathOnDisk); if (!Oplog) { HttpReq.WriteResponse(HttpResponseCode::NotFound, @@ -5443,8 +5447,6 @@ ProjectStore::Rpc(HttpServerRequest& HttpReq, } Project->TouchOplog(OplogId); - std::string_view Method = Cb["method"sv].AsString(); - if (Method == "import"sv) { if (!AreDiskWritesAllowed()) diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index 216b1c4dd..0589fdc5f 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -266,7 +266,10 @@ namespace remotestore_impl { AppendBatch(); } - ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); + if (OpCount > 0) + { + ReportProgress(OptionalContext, "Writing oplog"sv, ""sv, OpCount, 0); + } return RemoteProjectStore::Result{.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0}; } @@ -1685,7 +1688,7 @@ BuildContainer(CidStore& ChunkStore, IoBuffer RawData = IoBufferBuilder::MakeFromFile(FilePath); if (RawData) { - if (RawData.GetSize() > ChunkFileSizeLimit) + if (AllowChunking && RawData.GetSize() > ChunkFileSizeLimit) { IoBufferFileReference FileRef; (void)RawData.GetFileReference(FileRef); diff --git a/src/zenserver/projectstore/zenremoteprojectstore.cpp b/src/zenserver/projectstore/zenremoteprojectstore.cpp index 566d0d4b2..42519b108 100644 --- a/src/zenserver/projectstore/zenremoteprojectstore.cpp +++ b/src/zenserver/projectstore/zenremoteprojectstore.cpp @@ -8,7 +8,7 @@ #include <zencore/fmtutils.h> #include <zencore/stream.h> #include <zenhttp/httpclient.h> -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> namespace zen { diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp deleted file mode 100644 index 2ae977f00..000000000 --- a/src/zenserver/upstream/jupiter.cpp +++ /dev/null @@ -1,662 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "jupiter.h" - -#include "diag/logging.h" - -#include <zencore/compactbinary.h> -#include <zencore/compositebuffer.h> -#include <zencore/fmtutils.h> -#include <zencore/iobuffer.h> -#include <zencore/iohash.h> -#include <zencore/scopeguard.h> -#include <zencore/thread.h> -#include <zencore/trace.h> -#include <zenhttp/formatters.h> -#include <zenutil/basicfile.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <fmt/format.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "Crypt32.lib") -# pragma comment(lib, "Wldap32.lib") -#endif - -#include <json11.hpp> - -using namespace std::literals; - -namespace zen { - -namespace detail { - CloudCacheResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) - { - if (Response.Error) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = Response.Error.value().ErrorCode, - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - if (!Response.IsSuccess()) - { - return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = static_cast<int32_t>(Response.StatusCode), - .Reason = Response.ErrorMessage(ErrorPrefix), - .Success = false}; - } - return {.Response = Response.ResponsePayload, - .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), - .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), - .ElapsedSeconds = Response.ElapsedSeconds, - .ErrorCode = 0, - .Success = true}; - } -} // namespace detail - -CloudCacheSession::CloudCacheSession(CloudCacheClient* CacheClient) : m_Log(CacheClient->Logger()), m_CacheClient(CacheClient) -{ -} - -CloudCacheSession::~CloudCacheSession() -{ -} - -CloudCacheResult -CloudCacheSession::Authenticate() -{ - bool OK = m_CacheClient->m_HttpClient.Authenticate(); - return {.Success = OK}; -} - -CloudCacheResult -CloudCacheSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::GetRef"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - {HttpClient::Accept(RefType)}); - - return detail::ConvertResponse(Response, "CloudCacheSession::GetRef"sv); -} - -CloudCacheResult -CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetBlob"); - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kBinary)}); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - TempFolderPath, - {HttpClient::Accept(ZenContentType::kCompressedBinary)}); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath) -{ - ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - TempFolderPath, - {{"Accept", "application/x-jupiter-inline"}}); - - CloudCacheResult Result = detail::ConvertResponse(Response); - - if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) - { - const std::string& PayloadHashHeader = It->second; - if (PayloadHashHeader.length() == IoHash::StringLength) - { - OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); - } - } - - return Result; -} - -CloudCacheResult -CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObject"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - return detail::ConvertResponse(Response); -} - -PutRefResult -CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) -{ - ZEN_TRACE_CPU("JupiterClient::PutRef"); - - Ref.SetContentType(RefType); - - IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), - Ref, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutRefResult Result = {detail::ConvertResponse(Response)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -FinalizeRefResult -CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) -{ - ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( - fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), - {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); - - FinalizeRefResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutBlob"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - Blob.SetContentType(ZenContentType::kCompressedBinary); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) -{ - ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), - Payload, - ZenContentType::kCompressedBinary); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) -{ - ZEN_TRACE_CPU("JupiterClient::PutObject"); - - Object.SetContentType(ZenContentType::kCbObject); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); - - return detail::ConvertResponse(Response); -} - -CloudCacheResult -CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::RefExists"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -GetObjectReferencesResult -CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), - {HttpClient::Accept(ZenContentType::kCbObject)}); - - GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ReferencesResponse = Response.AsObject(); - for (auto& Item : ReferencesResponse["references"sv]) - { - Result.References.insert(Item.AsHash()); - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::BlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); -} - -CloudCacheResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const IoHash& Key) -{ - return CacheTypeExists(Namespace, "objects"sv, Key); -} - -CloudCacheExistsResult -CloudCacheSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); -} - -CloudCacheExistsResult -CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) -{ - return CacheTypeExists(Namespace, "objects"sv, Keys); -} - -std::vector<IoHash> -CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) -{ - // ExtendableStringBuilder<256> Uri; - // Uri << m_CacheClient->ServiceUrl(); - // Uri << "/api/v1/s/" << Namespace; - - ZEN_UNUSED(Namespace, BucketId, ChunkHashes); - - return {}; -} - -CloudCacheResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); - - return detail::ConvertResponse(Response); -} - -CloudCacheExistsResult -CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) -{ - ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); - - ExtendableStringBuilder<256> Body; - Body << "["; - for (const auto& Key : Keys) - { - Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; - } - Body << "]"; - IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); - Payload.SetContentType(ZenContentType::kJSON); - - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); - - CloudCacheExistsResult Result = {detail::ConvertResponse(Response)}; - - if (Result.Success) - { - const CbObject ExistsResponse = Response.AsObject(); - for (auto& Item : ExistsResponse["needs"sv]) - { - Result.Needs.insert(Item.AsHash()); - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBuild"sv); -} - -CloudCacheResult -CloudCacheSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuild"sv); -} - -CloudCacheResult -CloudCacheSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) -{ - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); - return detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuild"sv); -} - -PutBuildPartResult -CloudCacheSession::PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - - IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); - - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), - Payload, - {{"X-Jupiter-IoHash", Hash.ToHexString()}}); - - PutBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::PutBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - Result.RawHash = Hash; - } - return Result; -} - -CloudCacheResult -CloudCacheSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = - m_CacheClient->m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildPart"sv); -} - -CloudCacheResult -CloudCacheSession::PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Upload( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload, - ContentType); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBuildBlob"sv); -} - -CloudCacheResult -CloudCacheSession::GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - std::filesystem::path TempFolderPath) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Download( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - TempFolderPath); - return detail::ConvertResponse(Response, "CloudCacheSession::GetBuildBlob"sv); -} - -CloudCacheResult -CloudCacheSession::PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - const IoBuffer& Payload) -{ - ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_CacheClient->m_HttpClient.Put( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), - Payload); - return detail::ConvertResponse(Response, "CloudCacheSession::PutBlockMetadata"sv); -} - -FinalizeBuildPartResult -CloudCacheSession::FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Post( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), - HttpClient::Accept(ZenContentType::kCbObject)); - - FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "CloudCacheSession::FinalizeBuildPart"sv)}; - if (Result.Success) - { - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); - if (JsonError.empty()) - { - json11::Json::array Needs = Json["needs"].array_items(); - for (const auto& Need : Needs) - { - Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); - } - } - } - return Result; -} - -CloudCacheResult -CloudCacheSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) -{ - HttpClient::Response Response = m_CacheClient->m_HttpClient.Get( - fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), - HttpClient::Accept(ZenContentType::kCbObject)); - return detail::ConvertResponse(Response, "CloudCacheSession::FindBlocks"sv); -} - -/** - * An access token provider that holds a token that will never change. - */ -class StaticTokenProvider final : public CloudCacheTokenProvider -{ -public: - StaticTokenProvider(CloudCacheAccessToken Token) : m_Token(std::move(Token)) {} - - virtual ~StaticTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Token; } - -private: - CloudCacheAccessToken m_Token; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromStaticToken(CloudCacheAccessToken Token) -{ - return std::make_unique<StaticTokenProvider>(std::move(Token)); -} - -class OAuthClientCredentialsTokenProvider final : public CloudCacheTokenProvider -{ -public: - OAuthClientCredentialsTokenProvider(const CloudCacheTokenProvider::OAuthClientCredentialsParams& Params) - { - m_Url = std::string(Params.Url); - m_ClientId = std::string(Params.ClientId); - m_ClientSecret = std::string(Params.ClientSecret); - } - - virtual ~OAuthClientCredentialsTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override - { - using namespace std::chrono; - - std::string Body = - fmt::format("client_id={}&scope=cache_access&grant_type=client_credentials&client_secret={}", m_ClientId, m_ClientSecret); - - cpr::Response Response = - cpr::Post(cpr::Url{m_Url}, cpr::Header{{"Content-Type", "application/x-www-form-urlencoded"}}, cpr::Body{std::move(Body)}); - - if (Response.error || Response.status_code != 200) - { - return {}; - } - - std::string JsonError; - json11::Json Json = json11::Json::parse(Response.text, JsonError); - - if (JsonError.empty() == false) - { - return {}; - } - - std::string Token = Json["access_token"].string_value(); - int64_t ExpiresInSeconds = static_cast<int64_t>(Json["expires_in"].int_value()); - CloudCacheAccessToken::TimePoint ExpireTime = CloudCacheAccessToken::Clock::now() + seconds(ExpiresInSeconds); - - return {.Value = fmt::format("Bearer {}", Token), .ExpireTime = ExpireTime}; - } - -private: - std::string m_Url; - std::string m_ClientId; - std::string m_ClientSecret; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params) -{ - return std::make_unique<OAuthClientCredentialsTokenProvider>(Params); -} - -class CallbackTokenProvider final : public CloudCacheTokenProvider -{ -public: - CallbackTokenProvider(std::function<CloudCacheAccessToken()>&& Callback) : m_Callback(std::move(Callback)) {} - - virtual ~CallbackTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() final override { return m_Callback(); } - -private: - std::function<CloudCacheAccessToken()> m_Callback; -}; - -std::unique_ptr<CloudCacheTokenProvider> -CloudCacheTokenProvider::CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback) -{ - return std::make_unique<CallbackTokenProvider>(std::move(Callback)); -} - -static std::optional<std::function<HttpClientAccessToken()>> -GetHttpClientAccessProvider(CloudCacheTokenProvider* TokenProvider) -{ - if (TokenProvider == nullptr) - { - return {}; - } - auto ProviderFunc = [TokenProvider]() -> HttpClientAccessToken { - CloudCacheAccessToken Token = TokenProvider->AcquireAccessToken(); - return HttpClientAccessToken{.Value = Token.Value, .ExpireTime = Token.ExpireTime}; - }; - return ProviderFunc; -} - -CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider) -: m_Log(zen::logging::Get("jupiter")) -, m_DefaultDdcNamespace(Options.DdcNamespace) -, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) -, m_ComputeCluster(Options.ComputeCluster) -, m_TokenProvider(std::move(TokenProvider)) -, m_HttpClient(Options.ServiceUrl, - HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, - .Timeout = Options.Timeout, - .AccessTokenProvider = GetHttpClientAccessProvider(m_TokenProvider.get()), - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = Options.AllowResume, - .RetryCount = Options.RetryCount}) -{ - ZEN_ASSERT(m_TokenProvider.get() != nullptr); -} - -CloudCacheClient::~CloudCacheClient() -{ -} - -} // namespace zen diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h deleted file mode 100644 index 50e4ad68a..000000000 --- a/src/zenserver/upstream/jupiter.h +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zenbase/refcount.h> -#include <zencore/iohash.h> -#include <zencore/logging.h> -#include <zencore/thread.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/httpserver.h> - -#include <atomic> -#include <chrono> -#include <list> -#include <memory> -#include <set> -#include <vector> - -struct ZenCacheValue; - -namespace cpr { -class Session; -} - -namespace zen { - -class CbObjectView; -class CloudCacheClient; -class IoBuffer; -struct IoHash; - -/** - * Cached access token, for use with `Authorization:` header - */ -struct CloudCacheAccessToken -{ - using Clock = std::chrono::system_clock; - using TimePoint = Clock::time_point; - - static constexpr int64_t ExpireMarginInSeconds = 30; - - std::string Value; - TimePoint ExpireTime; - - bool IsValid() const - { - return Value.empty() == false && - ExpireMarginInSeconds < std::chrono::duration_cast<std::chrono::seconds>(ExpireTime - Clock::now()).count(); - } -}; - -struct CloudCacheResult -{ - IoBuffer Response; - uint64_t SentBytes{}; - uint64_t ReceivedBytes{}; - double ElapsedSeconds{}; - int32_t ErrorCode{}; - std::string Reason; - bool Success = false; -}; - -struct PutRefResult : CloudCacheResult -{ - std::vector<IoHash> Needs; - IoHash RawHash; -}; - -struct FinalizeRefResult : CloudCacheResult -{ - std::vector<IoHash> Needs; -}; - -struct CloudCacheExistsResult : CloudCacheResult -{ - std::set<IoHash> Needs; -}; - -struct GetObjectReferencesResult : CloudCacheResult -{ - std::set<IoHash> References; -}; - -struct PutBuildPartResult : CloudCacheResult -{ - std::vector<IoHash> Needs; - IoHash RawHash; -}; - -struct FinalizeBuildPartResult : CloudCacheResult -{ - std::vector<IoHash> Needs; -}; - -/** - * Context for performing Jupiter operations - * - * Maintains an HTTP connection so that subsequent operations don't need to go - * through the whole connection setup process - * - */ -class CloudCacheSession -{ -public: - CloudCacheSession(CloudCacheClient* CacheClient); - ~CloudCacheSession(); - - CloudCacheResult Authenticate(); - - CloudCacheResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); - CloudCacheResult GetBlob(std::string_view Namespace, const IoHash& Key); - CloudCacheResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); - CloudCacheResult GetObject(std::string_view Namespace, const IoHash& Key); - CloudCacheResult GetInlineBlob(std::string_view Namespace, - std::string_view BucketId, - const IoHash& Key, - IoHash& OutPayloadHash, - std::filesystem::path TempFolderPath = {}); - - PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); - CloudCacheResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); - CloudCacheResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); - CloudCacheResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); - - FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); - - CloudCacheResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); - - GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); - - CloudCacheResult BlobExists(std::string_view Namespace, const IoHash& Key); - CloudCacheResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); - CloudCacheResult ObjectExists(std::string_view Namespace, const IoHash& Key); - - CloudCacheExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); - CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); - CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); - - std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); - - CloudCacheResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); - CloudCacheResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - CloudCacheResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); - PutBuildPartResult PutBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - std::string_view PartName, - const IoBuffer& Payload); - CloudCacheResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); - CloudCacheResult PutBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - ZenContentType ContentType, - const CompositeBuffer& Payload); - CloudCacheResult GetBuildBlob(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - std::filesystem::path TempFolderPath); - CloudCacheResult PutBlockMetadata(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& Hash, - const IoBuffer& Payload); - FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, - std::string_view BucketId, - const Oid& BuildId, - const Oid& PartId, - const IoHash& RawHash); - CloudCacheResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); - - CloudCacheClient& Client() { return *m_CacheClient; }; - -private: - inline LoggerRef Log() { return m_Log; } - - CloudCacheResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); - - CloudCacheExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); - - LoggerRef m_Log; - RefPtr<CloudCacheClient> m_CacheClient; -}; - -/** - * Access token provider interface - */ -class CloudCacheTokenProvider -{ -public: - virtual ~CloudCacheTokenProvider() = default; - - virtual CloudCacheAccessToken AcquireAccessToken() = 0; - - static std::unique_ptr<CloudCacheTokenProvider> CreateFromStaticToken(CloudCacheAccessToken Token); - - struct OAuthClientCredentialsParams - { - std::string_view Url; - std::string_view ClientId; - std::string_view ClientSecret; - }; - - static std::unique_ptr<CloudCacheTokenProvider> CreateFromOAuthClientCredentials(const OAuthClientCredentialsParams& Params); - - static std::unique_ptr<CloudCacheTokenProvider> CreateFromCallback(std::function<CloudCacheAccessToken()>&& Callback); -}; - -struct CloudCacheClientOptions -{ - std::string_view Name; - std::string_view ServiceUrl; - std::string_view DdcNamespace; - std::string_view BlobStoreNamespace; - std::string_view ComputeCluster; - std::chrono::milliseconds ConnectTimeout{5000}; - std::chrono::milliseconds Timeout{}; - bool AssumeHttp2 = false; - bool AllowResume = false; - uint8_t RetryCount = 0; -}; - -/** - * Jupiter upstream cache client - */ -class CloudCacheClient : public RefCounted -{ -public: - CloudCacheClient(const CloudCacheClientOptions& Options, std::unique_ptr<CloudCacheTokenProvider> TokenProvider); - ~CloudCacheClient(); - - std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } - std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } - std::string_view ComputeCluster() const { return m_ComputeCluster; } - std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } - - LoggerRef Logger() { return m_Log; } - -private: - LoggerRef m_Log; - const std::string m_DefaultDdcNamespace; - const std::string m_DefaultBlobStoreNamespace; - const std::string m_ComputeCluster; - const std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; - HttpClient m_HttpClient; - - friend class CloudCacheSession; -}; - -} // namespace zen diff --git a/src/zenserver/upstream/upstream.h b/src/zenserver/upstream/upstream.h index a57301206..4d45687fc 100644 --- a/src/zenserver/upstream/upstream.h +++ b/src/zenserver/upstream/upstream.h @@ -2,7 +2,6 @@ #pragma once -#include <upstream/jupiter.h> #include <upstream/upstreamcache.h> #include <upstream/upstreamservice.h> #include <upstream/zen.h> diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index ab8fe8704..e438a840a 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1,7 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "upstreamcache.h" -#include "jupiter.h" #include "zen.h" #include <zencore/blockingqueue.h> @@ -15,11 +14,15 @@ #include <zencore/timer.h> #include <zencore/trace.h> -#include <zenhttp/auth/authmgr.h> -#include <zenstore/cidstore.h> -#include <zenutil/packageformat.h> +#include <zenhttp/httpclientauth.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/structuredcachestore.h> +#include <zenstore/cidstore.h> + +#include <zenutil/jupiter/jupiterclient.h> +#include <zenutil/jupiter/jupitersession.h> + #include "cache/httpstructuredcache.h" #include "diag/logging.h" @@ -85,7 +88,7 @@ namespace detail { class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: - JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) + JupiterUpstreamEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) : m_AuthMgr(Mgr) , m_Log(zen::logging::Get("upstream")) { @@ -93,30 +96,27 @@ namespace detail { m_Info.Name = Options.Name; m_Info.Url = Options.ServiceUrl; - std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + std::function<HttpClientAccessToken()> TokenProvider; if (AuthConfig.OAuthUrl.empty() == false) { - TokenProvider = CloudCacheTokenProvider::CreateFromOAuthClientCredentials( + TokenProvider = httpclientauth::CreateFromOAuthClientCredentials( {.Url = AuthConfig.OAuthUrl, .ClientId = AuthConfig.OAuthClientId, .ClientSecret = AuthConfig.OAuthClientSecret}); } - else if (AuthConfig.OpenIdProvider.empty() == false) + else if (!AuthConfig.OpenIdProvider.empty()) { - TokenProvider = - CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(AuthConfig.OpenIdProvider)]() { - AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); - return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; - }); + TokenProvider = httpclientauth::CreateFromOpenIdProvider(m_AuthMgr, AuthConfig.OpenIdProvider); + } + else if (!AuthConfig.AccessToken.empty()) + { + TokenProvider = httpclientauth::CreateFromStaticToken(AuthConfig.AccessToken); } else { - CloudCacheAccessToken AccessToken{.Value = std::string(AuthConfig.AccessToken), - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - - TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(m_AuthMgr); } - m_Client = new CloudCacheClient(Options, std::move(TokenProvider)); + m_Client = new JupiterClient(Options, std::move(TokenProvider)); } virtual ~JupiterUpstreamEndpoint() {} @@ -134,8 +134,8 @@ namespace detail { return {.State = UpstreamEndpointState::kOk}; } - CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.Authenticate(); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + const JupiterResult Result = Session.Authenticate(); if (Result.Success) { @@ -160,20 +160,11 @@ namespace detail { } } - std::string_view GetActualDdcNamespace(CloudCacheSession& Session, std::string_view Namespace) - { - if (Namespace == ZenCacheStore::DefaultNamespace) - { - return Session.Client().DefaultDdcNamespace(); - } - return Namespace; - } - - std::string_view GetActualBlobStoreNamespace(CloudCacheSession& Session, std::string_view Namespace) + std::string_view GetActualBlobStoreNamespace(std::string_view Namespace) { if (Namespace == ZenCacheStore::DefaultNamespace) { - return Session.Client().DefaultBlobStoreNamespace(); + return m_Client->DefaultBlobStoreNamespace(); } return Namespace; } @@ -190,10 +181,10 @@ namespace detail { try { - CloudCacheSession Session(m_Client); - CloudCacheResult Result; + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + JupiterResult Result; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); if (Type == ZenContentType::kCompressedBinary) { @@ -209,7 +200,7 @@ namespace detail { int NumAttachments = 0; CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; @@ -249,7 +240,7 @@ namespace detail { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); CacheRecord.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult AttachmentResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); Result.ReceivedBytes += AttachmentResult.ReceivedBytes; Result.SentBytes += AttachmentResult.SentBytes; Result.ElapsedSeconds += AttachmentResult.ElapsedSeconds; @@ -310,7 +301,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheRecords"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheKeyRequest* Request : Requests) @@ -322,9 +313,8 @@ namespace detail { double ElapsedSeconds = 0.0; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - CloudCacheResult RefResult = - Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + JupiterResult RefResult = Session.GetRef(BlobStoreNamespace, CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); AppendResult(RefResult, Result); ElapsedSeconds = RefResult.ElapsedSeconds; @@ -337,7 +327,7 @@ namespace detail { { Record = LoadCompactBinaryObject(RefResult.Response); Record.IterateAttachments([&](CbFieldView AttachmentHash) { - CloudCacheResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); + JupiterResult BlobResult = Session.GetCompressedBlob(BlobStoreNamespace, AttachmentHash.AsHash()); AppendResult(BlobResult, Result); m_Status.SetFromErrorCode(BlobResult.ErrorCode, BlobResult.Reason); @@ -375,9 +365,9 @@ namespace detail { try { - CloudCacheSession Session(m_Client); - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - const CloudCacheResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + const JupiterResult Result = Session.GetCompressedBlob(BlobStoreNamespace, ValueContentId); m_Status.SetFromErrorCode(Result.ErrorCode, Result.Reason); @@ -408,7 +398,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheChunks"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheChunkRequest* RequestPtr : CacheChunkRequests) @@ -422,8 +412,8 @@ namespace detail { bool IsCompressed = false; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - const CloudCacheResult BlobResult = + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + const JupiterResult BlobResult = Request.ChunkId == IoHash::Zero ? Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, Request.ChunkId) : Session.GetCompressedBlob(BlobStoreNamespace, Request.ChunkId); @@ -463,7 +453,7 @@ namespace detail { { ZEN_TRACE_CPU("Upstream::Jupiter::GetCacheValues"); - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); GetUpstreamCacheResult Result; for (CacheValueRequest* RequestPtr : CacheValueRequests) @@ -477,9 +467,9 @@ namespace detail { bool IsCompressed = false; if (!Result.Error) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); - IoHash PayloadHash; - const CloudCacheResult BlobResult = + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); + IoHash PayloadHash; + const JupiterResult BlobResult = Session.GetInlineBlob(BlobStoreNamespace, Request.Key.Bucket, Request.Key.Hash, PayloadHash); ElapsedSeconds = BlobResult.ElapsedSeconds; Payload = BlobResult.Response; @@ -543,14 +533,14 @@ namespace detail { try { - CloudCacheSession Session(m_Client); + JupiterSession Session(m_Client->Logger(), m_Client->Client()); if (CacheRecord.Type == ZenContentType::kBinary) { - CloudCacheResult Result; + JupiterResult Result; for (uint32_t Attempt = 0; Attempt < MaxAttempts && !Result.Success; Attempt++) { - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, CacheRecord.Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(CacheRecord.Namespace); Result = Session.PutRef(BlobStoreNamespace, CacheRecord.Key.Bucket, CacheRecord.Key.Hash, @@ -632,7 +622,7 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: - static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + static void AppendResult(const JupiterResult& Result, GetUpstreamCacheResult& Out) { Out.Success &= Result.Success; Out.Bytes += gsl::narrow<int64_t>(Result.ReceivedBytes); @@ -645,7 +635,7 @@ namespace detail { }; PutUpstreamCacheResult PerformStructuredPut( - CloudCacheSession& Session, + JupiterSession& Session, std::string_view Namespace, const CacheKey& Key, IoBuffer ObjectBuffer, @@ -655,7 +645,7 @@ namespace detail { int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Session, Namespace); + std::string_view BlobStoreNamespace = GetActualBlobStoreNamespace(Namespace); const auto PutBlobs = [&](std::span<IoHash> ValueContentIds, std::string& OutReason) -> bool { for (const IoHash& ValueContentId : ValueContentIds) { @@ -665,7 +655,7 @@ namespace detail { return false; } - CloudCacheResult BlobResult; + JupiterResult BlobResult; for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) { BlobResult = Session.PutCompressedBlob(BlobStoreNamespace, ValueContentId, BlobBuffer); @@ -760,12 +750,12 @@ namespace detail { LoggerRef Log() { return m_Log; } - AuthMgr& m_AuthMgr; - LoggerRef m_Log; - UpstreamEndpointInfo m_Info; - UpstreamStatus m_Status; - UpstreamEndpointStats m_Stats; - RefPtr<CloudCacheClient> m_Client; + AuthMgr& m_AuthMgr; + LoggerRef m_Log; + UpstreamEndpointInfo m_Info; + UpstreamStatus m_Status; + UpstreamEndpointStats m_Stats; + RefPtr<JupiterClient> m_Client; }; class ZenUpstreamEndpoint final : public UpstreamEndpoint @@ -2129,7 +2119,7 @@ UpstreamEndpoint::CreateZenEndpoint(const ZenStructuredCacheClientOptions& Optio } std::unique_ptr<UpstreamEndpoint> -UpstreamEndpoint::CreateJupiterEndpoint(const CloudCacheClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) +UpstreamEndpoint::CreateJupiterEndpoint(const JupiterClientOptions& Options, const UpstreamAuthConfig& AuthConfig, AuthMgr& Mgr) { return std::make_unique<detail::JupiterUpstreamEndpoint>(Options, AuthConfig, Mgr); } diff --git a/src/zenserver/upstream/upstreamcache.h b/src/zenserver/upstream/upstreamcache.h index bb0193e4e..26e5decac 100644 --- a/src/zenserver/upstream/upstreamcache.h +++ b/src/zenserver/upstream/upstreamcache.h @@ -26,8 +26,8 @@ class CbPackage; class CbObjectWriter; class CidStore; class ZenCacheStore; -struct CloudCacheClientOptions; -class CloudCacheTokenProvider; +struct JupiterClientOptions; +class JupiterAccessTokenProvider; struct ZenStructuredCacheClientOptions; struct UpstreamEndpointStats @@ -128,9 +128,9 @@ public: static std::unique_ptr<UpstreamEndpoint> CreateZenEndpoint(const ZenStructuredCacheClientOptions& Options); - static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const CloudCacheClientOptions& Options, - const UpstreamAuthConfig& AuthConfig, - AuthMgr& Mgr); + static std::unique_ptr<UpstreamEndpoint> CreateJupiterEndpoint(const JupiterClientOptions& Options, + const UpstreamAuthConfig& AuthConfig, + AuthMgr& Mgr); }; /** diff --git a/src/zenserver/upstream/upstreamservice.cpp b/src/zenserver/upstream/upstreamservice.cpp index 3d4a0f823..1dcbdb604 100644 --- a/src/zenserver/upstream/upstreamservice.cpp +++ b/src/zenserver/upstream/upstreamservice.cpp @@ -2,7 +2,6 @@ #include <upstream/upstreamservice.h> #include <upstream/upstreamcache.h> -#include <zenhttp/auth/authmgr.h> #include <zencore/compactbinarybuilder.h> #include <zencore/string.h> diff --git a/src/zenserver/upstream/zen.cpp b/src/zenserver/upstream/zen.cpp index c031a4086..7494ae379 100644 --- a/src/zenserver/upstream/zen.cpp +++ b/src/zenserver/upstream/zen.cpp @@ -10,7 +10,7 @@ #include <zencore/stream.h> #include <zenhttp/formatters.h> #include <zenhttp/httpcommon.h> -#include <zenutil/packageformat.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/structuredcachestore.h> #include "diag/logging.h" diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp index 9444f7644..2d59c9357 100644 --- a/src/zenserver/workspaces/httpworkspaces.cpp +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -2,12 +2,12 @@ #include <workspaces/httpworkspaces.h> +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/trace.h> #include <zenstore/workspaces.h> -#include <zenutil/basicfile.h> #include <zenutil/chunkrequests.h> #include <zenutil/workerpools.h> diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 66b6cb858..f84bc0b00 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -5,6 +5,7 @@ #include "sentryintegration.h" #include <zenbase/refcount.h> +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinaryvalidation.h> #include <zencore/config.h> @@ -25,7 +26,7 @@ #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenstore/workspaces.h> -#include <zenutil/basicfile.h> +#include <zenutil/jupiter/jupiterclient.h> #include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> @@ -598,12 +599,12 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) { std::string_view EndpointName = UpstreamConfig.JupiterConfig.Name.empty() ? "Jupiter"sv : UpstreamConfig.JupiterConfig.Name; - auto Options = CloudCacheClientOptions{.Name = EndpointName, - .ServiceUrl = UpstreamConfig.JupiterConfig.Url, - .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, - .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, - .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), - .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + auto Options = JupiterClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .DdcNamespace = UpstreamConfig.JupiterConfig.DdcNamespace, + .BlobStoreNamespace = UpstreamConfig.JupiterConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; auto AuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 9ad672060..e976c061d 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -2,6 +2,7 @@ #include <zenstore/blockstore.h> +#include <zencore/enumflags.h> #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> @@ -809,8 +810,8 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, ZEN_ASSERT(BlockFile); InsertLock.ReleaseNow(); - IoBuffer ReadBuffer{IterateSmallChunkWindowSize}; - void* BufferBase = ReadBuffer.MutableData(); + IoBuffer ReadBuffer; + void* BufferBase = nullptr; size_t LocationIndexOffset = 0; while (LocationIndexOffset < ChunkIndexes.size()) @@ -825,6 +826,11 @@ BlockStore::IterateBlock(std::span<const BlockStoreLocation> ChunkLocations, size_t LastChunkIndex = ChunkIndexes[LocationIndexOffset + RangeCount - 1]; const BlockStoreLocation& LastLocation = ChunkLocations[LastChunkIndex]; uint64_t Size = LastLocation.Offset + LastLocation.Size - FirstLocation.Offset; + if (ReadBuffer.GetSize() < Size) + { + ReadBuffer = IoBuffer(Min(Size * 2, IterateSmallChunkWindowSize)); + BufferBase = ReadBuffer.MutableData(); + } BlockFile->Read(BufferBase, Size, FirstLocation.Offset); for (size_t RangeIndex = 0; RangeIndex < RangeCount; ++RangeIndex) { @@ -1422,8 +1428,8 @@ namespace blockstore::impl { { DirectoryContent DirectoryContent; GetDirectoryContent(RootDir, - DirectoryContent::RecursiveFlag | (Files ? DirectoryContent::IncludeFilesFlag : 0) | - (Directories ? DirectoryContent::IncludeDirsFlag : 0), + DirectoryContentFlags::Recursive | (Files ? DirectoryContentFlags::IncludeFiles : DirectoryContentFlags::None) | + (Directories ? DirectoryContentFlags::IncludeDirs : DirectoryContentFlags::None), DirectoryContent); std::vector<std::filesystem::path> Result; Result.insert(Result.end(), DirectoryContent.Directories.begin(), DirectoryContent.Directories.end()); diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index 9f09713ee..25f68330a 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -1267,6 +1267,7 @@ ZenCacheDiskLayer::CacheBucket::EndPutBatch(PutBatchHandle* Batch) noexcept size_t IndexOffset = 0; m_BlockStore.WriteChunks(Batch->Buffers, m_Configuration.PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + ZEN_MEMSCOPE(GetCacheDiskTag()); std::vector<DiskIndexEntry> DiskEntries; { RwLock::ExclusiveLockScope IndexLock(m_IndexLock); @@ -2679,6 +2680,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, Value.Value.Size(), m_Configuration.PayloadAlignment, [&](const BlockStoreLocation& BlockStoreLocation) { + ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); m_SlogFile.Append({.Key = HashKey, .Location = Location}); @@ -3856,7 +3858,7 @@ ZenCacheDiskLayer::DiscoverBuckets() ZEN_TRACE_CPU("Z$::DiscoverBuckets"); DirectoryContent DirContent; - GetDirectoryContent(m_RootDir, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_RootDir, DirectoryContentFlags::IncludeDirs, DirContent); // Initialize buckets diff --git a/src/zenstore/cache/cacherpc.cpp b/src/zenstore/cache/cacherpc.cpp index e6b6be525..cca51e63e 100644 --- a/src/zenstore/cache/cacherpc.cpp +++ b/src/zenstore/cache/cacherpc.cpp @@ -10,12 +10,13 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> +#include <zenhttp/packageformat.h> #include <zenstore/cache/cacheshared.h> #include <zenstore/cache/structuredcachestore.h> #include <zenstore/cache/upstreamcacheclient.h> #include <zenstore/cidstore.h> #include <zenutil/cache/cacherequests.h> -#include <zenutil/packageformat.h> +#include <zenutil/workerpools.h> #include <zencore/memory/llm.h> @@ -561,7 +562,17 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.RecordObject = CbObjectView(Request.RecordCacheValue.GetData()); ParseValues(Request); - Request.Complete = true; + Request.Complete = true; + size_t ValueCount = Request.Values.size(); + std::vector<IoHash> CidHashes; + std::vector<size_t> RequestValueIndexes; + const bool DoBatch = ValueCount > 7; + if (DoBatch) + { + CidHashes.reserve(ValueCount); + RequestValueIndexes.reserve(ValueCount); + } + size_t ValueIndex = 0; for (ValueRequestData& Value : Request.Values) { CachePolicy ValuePolicy = Value.DownstreamPolicy; @@ -596,6 +607,11 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb Request.Complete = false; } } + else if (DoBatch) + { + CidHashes.push_back(Value.ContentId); + RequestValueIndexes.push_back(ValueIndex); + } else { if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Value.ContentId)) @@ -611,7 +627,6 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); } } - if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) { NeedUpstreamAttachment = true; @@ -619,6 +634,53 @@ CacheRpcHandler::HandleRpcGetCacheRecords(const CacheRequestContext& Context, Cb } Request.Complete = false; } + ValueIndex++; + } + if (!RequestValueIndexes.empty()) + { + m_CidStore.IterateChunks( + CidHashes, + [this, &Request, ValueCount, &RequestValueIndexes](size_t Index, const IoBuffer& Payload) -> bool { + try + { + const size_t ValueIndex = RequestValueIndexes[Index]; + ValueRequestData& Value = Request.Values[ValueIndex]; + if (Payload) + { + Value.Payload = CompressedBuffer::FromCompressedNoValidate(IoBuffer(Payload)).MakeOwned(); + if (Value.Payload) + { + Value.Exists = true; + } + else + { + ZEN_WARN("Skipping invalid chunk in local cache '{}'", Value.ContentId); + } + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("CacheRpcHandler::HandleRpcGetCacheRecords IterateChunks callback failed with '{}'", Ex.what()); + } + return true; + }, + &GetSmallWorkerPool(EWorkloadType::Burst), + 64u * 1024u); + + for (size_t Index : RequestValueIndexes) + { + ValueRequestData& Value = Request.Values[Index]; + if (!Value.Exists) + { + const CachePolicy ValuePolicy = Value.DownstreamPolicy; + if (EnumHasAllFlags(ValuePolicy, CachePolicy::QueryRemote)) + { + NeedUpstreamAttachment = true; + Value.ReadFromUpstream = true; + } + Request.Complete = false; + } + } } } } @@ -1415,6 +1477,7 @@ CacheRpcHandler::GetLocalCacheRecords(const CacheRequestContext& Context, using namespace cache::detail; const bool HasUpstream = m_UpstreamCache.IsActive(); + // TODO: BatchGet records? std::vector<CacheKeyRequest*> UpstreamRecordRequests; for (size_t RecordIndex = 0; RecordIndex < Records.size(); ++RecordIndex) { diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index c14ea73a8..133cb42d7 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -424,7 +424,7 @@ ZenCacheStore::ZenCacheStore(GcManager& Gc, ZEN_INFO("initializing cache store at '{}'", m_BasePath); DirectoryContent DirContent; - GetDirectoryContent(m_BasePath, DirectoryContent::IncludeDirsFlag, DirContent); + GetDirectoryContent(m_BasePath, DirectoryContentFlags::IncludeDirs, DirContent); std::vector<std::string> Namespaces; for (const std::filesystem::path& DirPath : DirContent.Directories) diff --git a/src/zenstore/chunkedfile.cpp b/src/zenstore/chunkedfile.cpp index 0b66c7b9b..f200bc1ec 100644 --- a/src/zenstore/chunkedfile.cpp +++ b/src/zenstore/chunkedfile.cpp @@ -1,7 +1,7 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/basicfile.h> #include <zenstore/chunkedfile.h> -#include <zenutil/basicfile.h> #include "chunking.h" @@ -195,10 +195,10 @@ TEST_CASE("chunkedfile.findparams") { # if 1 DirectoryContent SourceContent1; - GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208", DirectoryContent::IncludeFilesFlag, SourceContent1); + GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208", DirectoryContentFlags::IncludeFiles, SourceContent1); const std::vector<std::filesystem::path>& SourceFiles1 = SourceContent1.Files; DirectoryContent SourceContent2; - GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208_2", DirectoryContent::IncludeFilesFlag, SourceContent2); + GetDirectoryContent("E:\\Temp\\ChunkingTestData\\31379208_2", DirectoryContentFlags::IncludeFiles, SourceContent2); const std::vector<std::filesystem::path>& SourceFiles2 = SourceContent2.Files; # else std::filesystem::path SourcePath1 = diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 50af7246e..2be0542db 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -199,6 +199,7 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const // reads, insert and GC. m_BlockStore.WriteChunk(ChunkData, ChunkSize, m_PayloadAlignment, [&](const BlockStoreLocation& Location) { + ZEN_MEMSCOPE(GetCasContainerTag()); ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; @@ -232,7 +233,6 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> ZEN_ASSERT(Chunks.size() == ChunkHashes.size()); std::vector<CasStore::InsertResult> Result(Chunks.size()); std::vector<size_t> NewChunkIndexes; - Result.reserve(Chunks.size()); { RwLock::SharedLockScope _(m_LocationMapLock); for (size_t ChunkIndex = 0; ChunkIndex < ChunkHashes.size(); ChunkIndex++) @@ -264,6 +264,7 @@ CasContainerStrategy::InsertChunks(std::span<IoBuffer> Chunks, std::span<IoHash> size_t ChunkOffset = 0; m_BlockStore.WriteChunks(Datas, m_PayloadAlignment, [&](std::span<BlockStoreLocation> Locations) { + ZEN_MEMSCOPE(GetCasContainerTag()); std::vector<CasDiskIndexEntry> IndexEntries; for (const BlockStoreLocation& Location : Locations) { @@ -345,12 +346,13 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } } } - if (FoundChunkLocations.size() < 3) + if (FoundChunkLocations.size() < 4) { - for (size_t ChunkIndex : FoundChunkIndexes) + for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { - IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); - if (!AsyncCallback(ChunkIndex, Chunk)) + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[Index]); + size_t OuterIndex = FoundChunkIndexes[Index]; + if (!AsyncCallback(OuterIndex, Chunk)) { return false; } @@ -359,6 +361,18 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, } auto DoOneBlock = [&](std::span<const size_t> ChunkIndexes) { + if (ChunkIndexes.size() < 4) + { + for (size_t ChunkIndex : ChunkIndexes) + { + IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); + if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) + { + return false; + } + } + return true; + } return m_BlockStore.IterateBlock( FoundChunkLocations, ChunkIndexes, @@ -378,19 +392,7 @@ CasContainerStrategy::IterateChunks(std::span<IoHash> ChunkHashes, Latch WorkLatch(1); std::atomic_bool AsyncContinue = true; bool Continue = m_BlockStore.IterateChunks(FoundChunkLocations, [&](uint32_t BlockIndex, std::span<const size_t> ChunkIndexes) { - if (ChunkIndexes.size() < 3) - { - for (size_t ChunkIndex : ChunkIndexes) - { - IoBuffer Chunk = m_BlockStore.TryGetChunk(FoundChunkLocations[ChunkIndex]); - if (!AsyncCallback(FoundChunkIndexes[ChunkIndex], Chunk)) - { - return false; - } - } - return true; - } - else if (OptionalWorkerPool) + if (OptionalWorkerPool && (ChunkIndexes.size() > 3)) { WorkLatch.AddCount(1); OptionalWorkerPool->ScheduleWork([&, ChunkIndexes = std::vector<size_t>(ChunkIndexes.begin(), ChunkIndexes.end())]() { diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 82dbe3551..14123528c 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -2,6 +2,7 @@ #include "filecas.h" +#include <zencore/basicfile.h> #include <zencore/compress.h> #include <zencore/except.h> #include <zencore/filesystem.h> @@ -19,7 +20,6 @@ #include <zencore/workthreadpool.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> -#include <zenutil/basicfile.h> #if ZEN_WITH_TESTS # include <zencore/compactbinarybuilder.h> @@ -185,7 +185,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN // in this folder as well struct Visitor : public FileSystemTraversal::TreeVisitor { - virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t) override + virtual void VisitFile(const std::filesystem::path&, const path_view&, uint64_t, uint32_t) override { // We don't care about files } @@ -193,8 +193,7 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN { return std::find(&HexChars[0], &HexChars[16], C) != &HexChars[16]; } - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName, uint32_t) override { if (DirectoryName.length() == 3) { @@ -1175,7 +1174,7 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) struct Visitor : public FileSystemTraversal::TreeVisitor { Visitor(const std::filesystem::path& RootDir, std::vector<FileCasIndexEntry>& Entries) : RootDirectory(RootDir), Entries(Entries) {} - virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize) override + virtual void VisitFile(const std::filesystem::path& Parent, const path_view& File, uint64_t FileSize, uint32_t) override { std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); @@ -1201,11 +1200,7 @@ FileCasStrategy::ScanFolderForCasFiles(const std::filesystem::path& RootDir) } } - virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, - [[maybe_unused]] const path_view& DirectoryName) override - { - return true; - } + virtual bool VisitDirectory(const std::filesystem::path&, const path_view&, uint32_t) override { return true; } const std::filesystem::path& RootDirectory; std::vector<FileCasIndexEntry>& Entries; diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index 50588b8c0..7ac10d613 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -1810,7 +1810,7 @@ GcScheduler::AppendGCLog(std::string_view Id, GcClock::TimePoint StartTime, cons } else { - ZEN_ERROR("writing gc result failed with system error exception: '{}'", SystemError.what()); + ZEN_ERROR("writing gc result failed with system error exception: '{}' ({})", SystemError.what(), SystemError.code().value()); } } catch (const std::bad_alloc& BadAlloc) @@ -2274,7 +2274,9 @@ GcScheduler::SchedulerThread() } else { - ZEN_ERROR("scheduling garbage collection failed with system error exception: '{}'", SystemError.what()); + ZEN_ERROR("scheduling garbage collection failed with system error exception: '{}' ({})", + SystemError.what(), + SystemError.code().value()); } m_LastGcTime = GcClock::Now(); m_LastLightweightGcTime = m_LastGcTime; @@ -2596,11 +2598,15 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, } if (RetryCount == 0) { - ZEN_ERROR("writing gc scheduler state failed with system error exception: '{}'", SystemError.what()); + ZEN_ERROR("writing gc scheduler state failed with system error exception: '{}' ({})", + SystemError.what(), + SystemError.code().value()); } else { - ZEN_WARN("writing gc scheduler state failed with system error exception: '{}'", SystemError.what()); + ZEN_WARN("writing gc scheduler state failed with system error exception: '{}' ({})", + SystemError.what(), + SystemError.code().value()); } } catch (const std::bad_alloc& BadAlloc) @@ -2636,11 +2642,15 @@ GcScheduler::CollectGarbage(const GcClock::TimePoint& CacheExpireTime, { if (SilenceErrors) { - ZEN_WARN("scheduling garbage collection failed with system error exception: '{}'", SystemError.what()); + ZEN_WARN("scheduling garbage collection failed with system error exception: '{}' ({})", + SystemError.what(), + SystemError.code().value()); } else { - ZEN_ERROR("scheduling garbage collection failed with system error exception: '{}'", SystemError.what()); + ZEN_ERROR("scheduling garbage collection failed with system error exception: '{}' ({})", + SystemError.what(), + SystemError.code().value()); } } m_LastGcTime = GcClock::Now(); diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 8f8f2ccd7..97357e5cb 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -2,9 +2,9 @@ #pragma once +#include <zencore/basicfile.h> #include <zencore/filesystem.h> #include <zencore/zencore.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> diff --git a/src/zenstore/include/zenstore/caslog.h b/src/zenstore/include/zenstore/caslog.h index edb4f8d9b..3d95c9c90 100644 --- a/src/zenstore/include/zenstore/caslog.h +++ b/src/zenstore/include/zenstore/caslog.h @@ -2,8 +2,8 @@ #pragma once +#include <zencore/basicfile.h> #include <zencore/uid.h> -#include <zenutil/basicfile.h> namespace zen { diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index 80e03296c..02a83d2a6 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -2,13 +2,13 @@ #include "zenstore/workspaces.h" +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/trace.h> #include <zencore/workthreadpool.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> @@ -252,18 +252,43 @@ FolderStructure::FolderStructure(std::vector<FileEntry>&& InEntries, std::vector } namespace { - struct FolderScanner + struct FolderScanner : public GetDirectoryContentVisitor { FolderScanner(LoggerRef& Log, WorkerThreadPool& WorkerPool, const std::filesystem::path& Path) : m_Log(Log) , Path(Path) - , WorkLatch(1) , WorkerPool(WorkerPool) { } void Traverse(); - void Traverse(const std::filesystem::path& RelativeRoot, const std::filesystem::path& Path); + + virtual void AsyncVisitDirectory(const std::filesystem::path& RelativeRoot, DirectoryContent&& Content) override + { + std::vector<FolderStructure::FileEntry> FileEntries; + std::vector<Oid> PathIds; + const size_t FileCount = Content.FileNames.size(); + FileEntries.reserve(FileCount); + PathIds.reserve(FileCount); + + auto FileNameIt = Content.FileNames.begin(); + auto FileSizeIt = Content.FileSizes.begin(); + while (FileNameIt != Content.FileNames.end()) + { + ZEN_ASSERT_SLOW(FileSizeIt != Content.FileSizes.end()); + + std::filesystem::path RelativePath = RelativeRoot.empty() ? *FileNameIt : RelativeRoot / *FileNameIt; + PathIds.emplace_back(Workspaces::PathToId(RelativePath)); + FileEntries.emplace_back(FolderStructure::FileEntry{.RelativePath = std::move(RelativePath), .Size = *FileSizeIt}); + + FileNameIt++; + FileSizeIt++; + } + WorkLock.WithExclusiveLock([&]() { + FoundFiles.insert(FoundFiles.end(), FileEntries.begin(), FileEntries.end()); + FoundFileIds.insert(FoundFileIds.end(), PathIds.begin(), PathIds.end()); + }); + } LoggerRef& Log() { return m_Log; } LoggerRef& m_Log; @@ -271,75 +296,34 @@ namespace { RwLock WorkLock; std::vector<FolderStructure::FileEntry> FoundFiles; std::vector<Oid> FoundFileIds; - Latch WorkLatch; WorkerThreadPool& WorkerPool; }; - struct Visitor : public FileSystemTraversal::TreeVisitor + void FolderScanner::Traverse() { - Visitor(FolderScanner& Data, const std::filesystem::path& RelativeRoot) : Data(Data), RelativeRoot(RelativeRoot) {} + Stopwatch Timer; - FileSystemTraversal Traverser; - FolderScanner& Data; - std::vector<FolderStructure::FileEntry> Entries; - std::vector<Oid> FileIds; - std::filesystem::path RelativeRoot; + const std::filesystem::path Root = std::filesystem::absolute(Path); - virtual void VisitFile(const std::filesystem::path&, const path_view& File, uint64_t FileSize) - { - std::filesystem::path RelativePath = RelativeRoot.empty() ? File : RelativeRoot / File; - Entries.push_back(FolderStructure::FileEntry{.RelativePath = RelativePath, .Size = FileSize}); - FileIds.push_back(Workspaces::PathToId(RelativePath)); - } + Latch WorkLatch(1); - virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName) - { - ZEN_ASSERT(!Parent.empty()); - ZEN_ASSERT(!DirectoryName.empty()); - FolderScanner* DataPtr = &Data; - Data.WorkLatch.AddCount(1); - Data.WorkerPool.ScheduleWork([DataPtr, - RootDir = Parent / DirectoryName, - RelativeRoot = RelativeRoot.empty() ? DirectoryName : RelativeRoot / DirectoryName]() { - auto _ = MakeGuard([DataPtr]() { DataPtr->WorkLatch.CountDown(); }); - try - { - DataPtr->Traverse(RelativeRoot, RootDir); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Exception while traversing path {} {}: {}", RelativeRoot, RootDir, Ex.what()); - } - }); - return false; - } - }; + GetDirectoryContent( + Root, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::Recursive, + *this, + WorkerPool, + WorkLatch); - void FolderScanner::Traverse() - { - Stopwatch Timer; - Traverse({}, std::filesystem::absolute(Path)); WorkLatch.CountDown(); while (!WorkLatch.Wait(1000)) { - WorkLock.WithSharedLock([&]() { ZEN_INFO("Found {} files in '{}'...", FoundFiles.size(), Path.string()); }); + WorkLock.WithSharedLock([&]() { ZEN_INFO("Found {} files in '{}'...", FoundFiles.size(), Root.string()); }); } + ZEN_ASSERT(FoundFiles.size() == FoundFileIds.size()); ZEN_INFO("Found {} files in '{}' in {}", FoundFiles.size(), Path.string(), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); } - void FolderScanner::Traverse(const std::filesystem::path& RelativeRoot, const std::filesystem::path& AbsoluteRoot) - { - Visitor LeafVisitor(*this, RelativeRoot); - LeafVisitor.Traverser.TraverseFileSystem(AbsoluteRoot, LeafVisitor); - if (!LeafVisitor.Entries.empty()) - { - WorkLock.WithExclusiveLock([&]() { - FoundFiles.insert(FoundFiles.end(), LeafVisitor.Entries.begin(), LeafVisitor.Entries.end()); - FoundFileIds.insert(FoundFileIds.end(), LeafVisitor.FileIds.begin(), LeafVisitor.FileIds.end()); - }); - } - } } // namespace std::unique_ptr<FolderStructure> diff --git a/src/zenutil/cache/rpcrecording.cpp b/src/zenutil/cache/rpcrecording.cpp index 9bef4d1a4..1f951167d 100644 --- a/src/zenutil/cache/rpcrecording.cpp +++ b/src/zenutil/cache/rpcrecording.cpp @@ -1,5 +1,8 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zenutil/cache/rpcrecording.h> + +#include <zencore/basicfile.h> #include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> @@ -8,8 +11,6 @@ #include <zencore/system.h> #include <zencore/testing.h> #include <zencore/testutils.h> -#include <zenutil/basicfile.h> -#include <zenutil/cache/rpcrecording.h> ZEN_THIRD_PARTY_INCLUDES_START #include <fmt/format.h> diff --git a/src/zenutil/include/zenutil/jupiter/jupiterclient.h b/src/zenutil/include/zenutil/jupiter/jupiterclient.h new file mode 100644 index 000000000..defe50edc --- /dev/null +++ b/src/zenutil/include/zenutil/jupiter/jupiterclient.h @@ -0,0 +1,57 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <chrono> + +namespace zen { + +class IoBuffer; + +struct JupiterClientOptions +{ + std::string_view Name; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; + bool AssumeHttp2 = false; + bool AllowResume = false; + uint8_t RetryCount = 0; +}; + +/** + * Jupiter upstream cache client + */ +class JupiterClient : public RefCounted +{ +public: + JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider); + ~JupiterClient(); + + std::string_view DefaultDdcNamespace() const { return m_DefaultDdcNamespace; } + std::string_view DefaultBlobStoreNamespace() const { return m_DefaultBlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } + std::string_view ServiceUrl() const { return m_HttpClient.GetBaseUri(); } + + LoggerRef Logger() { return m_Log; } + HttpClient& Client() { return m_HttpClient; } + +private: + LoggerRef m_Log; + const std::string m_DefaultDdcNamespace; + const std::string m_DefaultBlobStoreNamespace; + const std::string m_ComputeCluster; + std::function<HttpClientAccessToken()> m_TokenProvider; + HttpClient m_HttpClient; + + friend class JupiterSession; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupitersession.h b/src/zenutil/include/zenutil/jupiter/jupitersession.h new file mode 100644 index 000000000..6a80332f4 --- /dev/null +++ b/src/zenutil/include/zenutil/jupiter/jupitersession.h @@ -0,0 +1,152 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zenhttp/httpclient.h> + +#include <set> + +namespace zen { + +class IoBuffer; + +struct JupiterResult +{ + IoBuffer Response; + uint64_t SentBytes{}; + uint64_t ReceivedBytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; + std::string Reason; + bool Success = false; +}; + +struct PutRefResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeRefResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +struct JupiterExistsResult : JupiterResult +{ + std::set<IoHash> Needs; +}; + +struct GetObjectReferencesResult : JupiterResult +{ + std::set<IoHash> References; +}; + +struct PutBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; + IoHash RawHash; +}; + +struct FinalizeBuildPartResult : JupiterResult +{ + std::vector<IoHash> Needs; +}; + +/** + * Context for performing Jupiter operations + * + * Maintains an HTTP connection so that subsequent operations don't need to go + * through the whole connection setup process + * + */ +class JupiterSession +{ +public: + JupiterSession(LoggerRef InLog, HttpClient& InHttpClient); + ~JupiterSession(); + + JupiterResult Authenticate(); + + JupiterResult GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType); + JupiterResult GetBlob(std::string_view Namespace, const IoHash& Key); + JupiterResult GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath = {}); + JupiterResult GetObject(std::string_view Namespace, const IoHash& Key); + JupiterResult GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath = {}); + + PutRefResult PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType); + JupiterResult PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob); + JupiterResult PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Blob); + JupiterResult PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object); + + FinalizeRefResult FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); + + JupiterResult RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key); + + GetObjectReferencesResult GetObjectReferences(std::string_view Namespace, const IoHash& Key); + + JupiterResult BlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult CompressedBlobExists(std::string_view Namespace, const IoHash& Key); + JupiterResult ObjectExists(std::string_view Namespace, const IoHash& Key); + + JupiterExistsResult BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys); + JupiterExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys); + + std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); + + JupiterResult PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload); + JupiterResult GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + JupiterResult FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId); + PutBuildPartResult PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload); + JupiterResult GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + JupiterResult PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload); + JupiterResult GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath); + JupiterResult PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload); + FinalizeBuildPartResult FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash); + JupiterResult FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId); + +private: + inline LoggerRef Log() { return m_Log; } + + JupiterResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key); + + JupiterExistsResult CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys); + + LoggerRef m_Log; + HttpClient& m_HttpClient; +}; + +} // namespace zen diff --git a/src/zenutil/include/zenutil/logging/rotatingfilesink.h b/src/zenutil/include/zenutil/logging/rotatingfilesink.h index 3eb9021dd..758722156 100644 --- a/src/zenutil/include/zenutil/logging/rotatingfilesink.h +++ b/src/zenutil/include/zenutil/logging/rotatingfilesink.h @@ -2,8 +2,8 @@ #pragma once +#include <zencore/basicfile.h> #include <zencore/memory/llm.h> -#include <zenutil/basicfile.h> ZEN_THIRD_PARTY_INCLUDES_START #include <spdlog/details/log_msg.h> diff --git a/src/zenutil/jupiter/jupiterclient.cpp b/src/zenutil/jupiter/jupiterclient.cpp new file mode 100644 index 000000000..5e5da3750 --- /dev/null +++ b/src/zenutil/jupiter/jupiterclient.cpp @@ -0,0 +1,29 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter/jupiterclient.h> + +namespace zen { + +using namespace std::literals; + +JupiterClient::JupiterClient(const JupiterClientOptions& Options, std::function<HttpClientAccessToken()>&& TokenProvider) +: m_Log(zen::logging::Get("jupiter"sv)) +, m_DefaultDdcNamespace(Options.DdcNamespace) +, m_DefaultBlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) +, m_TokenProvider(std::move(TokenProvider)) +, m_HttpClient(Options.ServiceUrl, + HttpClientSettings{.ConnectTimeout = Options.ConnectTimeout, + .Timeout = Options.Timeout, + .AccessTokenProvider = std::move(TokenProvider), + .AssumeHttp2 = Options.AssumeHttp2, + .AllowResume = Options.AllowResume, + .RetryCount = Options.RetryCount}) +{ +} + +JupiterClient::~JupiterClient() +{ +} + +} // namespace zen diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp new file mode 100644 index 000000000..f706a7efc --- /dev/null +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -0,0 +1,505 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/jupiter/jupitersession.h> + +#include <zencore/compactbinary.h> +#include <zencore/fmtutils.h> +#include <zencore/trace.h> + +ZEN_THIRD_PARTY_INCLUDES_START +//#include <cpr/cpr.h> +//#include <fmt/format.h> +#include <json11.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +using namespace std::literals; + +namespace zen { + +namespace detail { + JupiterResult ConvertResponse(const HttpClient::Response& Response, const std::string_view ErrorPrefix = ""sv) + { + if (Response.Error) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = Response.Error.value().ErrorCode, + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + if (!Response.IsSuccess()) + { + return {.SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = static_cast<int32_t>(Response.StatusCode), + .Reason = Response.ErrorMessage(ErrorPrefix), + .Success = false}; + } + return {.Response = Response.ResponsePayload, + .SentBytes = gsl::narrow<uint64_t>(Response.UploadedBytes), + .ReceivedBytes = gsl::narrow<uint64_t>(Response.DownloadedBytes), + .ElapsedSeconds = Response.ElapsedSeconds, + .ErrorCode = 0, + .Success = true}; + } +} // namespace detail + +JupiterSession::JupiterSession(LoggerRef InLog, HttpClient& InHttpClient) : m_Log(InLog), m_HttpClient(InHttpClient) +{ +} + +JupiterSession::~JupiterSession() +{ +} + +JupiterResult +JupiterSession::Authenticate() +{ + bool OK = m_HttpClient.Authenticate(); + return {.Success = OK}; +} + +JupiterResult +JupiterSession::GetRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::GetRef"); + + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), {HttpClient::Accept(RefType)}); + + return detail::ConvertResponse(Response, "JupiterSession::GetRef"sv); +} + +JupiterResult +JupiterSession::GetBlob(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetBlob"); + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), {HttpClient::Accept(ZenContentType::kBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + TempFolderPath, + {HttpClient::Accept(ZenContentType::kCompressedBinary)}); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::GetInlineBlob(std::string_view Namespace, + std::string_view BucketId, + const IoHash& Key, + IoHash& OutPayloadHash, + std::filesystem::path TempFolderPath) +{ + ZEN_TRACE_CPU("JupiterClient::GetInlineBlob"); + + HttpClient::Response Response = m_HttpClient.Download(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + TempFolderPath, + {{"Accept", "application/x-jupiter-inline"}}); + + JupiterResult Result = detail::ConvertResponse(Response); + + if (auto It = Response.Header->find("X-Jupiter-InlinePayloadHash"); It != Response.Header->end()) + { + const std::string& PayloadHashHeader = It->second; + if (PayloadHashHeader.length() == IoHash::StringLength) + { + OutPayloadHash = IoHash::FromHexString(PayloadHashHeader); + } + } + + return Result; +} + +JupiterResult +JupiterSession::GetObject(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObject"); + + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + return detail::ConvertResponse(Response); +} + +PutRefResult +JupiterSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType) +{ + ZEN_TRACE_CPU("JupiterClient::PutRef"); + + Ref.SetContentType(RefType); + + IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size()); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString()), + Ref, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutRefResult Result = {detail::ConvertResponse(Response)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +FinalizeRefResult +JupiterSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash) +{ + ZEN_TRACE_CPU("JupiterClient::FinalizeRef"); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/refs/{}/{}/{}/finalize/{}", Namespace, BucketId, Key.ToHexString(), RefHash.ToHexString()), + {{"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + + FinalizeRefResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(std::string(Response.ToText()), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutBlob"); + + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v1/blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + Blob.SetContentType(ZenContentType::kCompressedBinary); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), Blob); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload) +{ + ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob"); + + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/compressed-blobs/{}/{}", Namespace, Key.ToHexString()), + Payload, + ZenContentType::kCompressedBinary); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object) +{ + ZEN_TRACE_CPU("JupiterClient::PutObject"); + + Object.SetContentType(ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Upload(fmt::format("/api/v1/objects/{}/{}", Namespace, Key.ToHexString()), Object); + + return detail::ConvertResponse(Response); +} + +JupiterResult +JupiterSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::RefExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/{}", Namespace, BucketId, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +GetObjectReferencesResult +JupiterSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::GetObjectReferences"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/refs/{}/{}/references", Namespace, Key.ToHexString()), + {HttpClient::Accept(ZenContentType::kCbObject)}); + + GetObjectReferencesResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ReferencesResponse = Response.AsObject(); + for (auto& Item : ReferencesResponse["references"sv]) + { + Result.References.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::BlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "blobs"sv, Key); +} + +JupiterResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Key); +} + +JupiterResult +JupiterSession::ObjectExists(std::string_view Namespace, const IoHash& Key) +{ + return CacheTypeExists(Namespace, "objects"sv, Key); +} + +JupiterExistsResult +JupiterSession::BlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "compressed-blobs"sv, Keys); +} + +JupiterExistsResult +JupiterSession::ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys) +{ + return CacheTypeExists(Namespace, "objects"sv, Keys); +} + +std::vector<IoHash> +JupiterSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes) +{ + // ExtendableStringBuilder<256> Uri; + // Uri << m_CacheClient->ServiceUrl(); + // Uri << "/api/v1/s/" << Namespace; + + ZEN_UNUSED(Namespace, BucketId, ChunkHashes); + + return {}; +} + +JupiterResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + HttpClient::Response Response = m_HttpClient.Head(fmt::format("/api/v1/{}/{}/{}", TypeId, Namespace, Key.ToHexString())); + + return detail::ConvertResponse(Response); +} + +JupiterExistsResult +JupiterSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys) +{ + ZEN_TRACE_CPU("JupiterClient::CacheTypeExists"); + + ExtendableStringBuilder<256> Body; + Body << "["; + for (const auto& Key : Keys) + { + Body << (Body.Size() != 1 ? ",\"" : "\"") << Key.ToHexString() << "\""; + } + Body << "]"; + IoBuffer Payload = IoBuffer(IoBuffer::Wrap, Body.Data(), Body.Size()); + Payload.SetContentType(ZenContentType::kJSON); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/api/v1/{}/{}/exist", TypeId, Namespace), Payload, {HttpClient::Accept(ZenContentType::kCbObject)}); + + JupiterExistsResult Result = {detail::ConvertResponse(Response)}; + + if (Result.Success) + { + const CbObject ExistsResponse = Response.AsObject(); + for (auto& Item : ExistsResponse["needs"sv]) + { + Result.Needs.insert(Item.AsHash()); + } + } + return Result; +} + +JupiterResult +JupiterSession::PutBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBuild"sv); +} + +JupiterResult +JupiterSession::GetBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}", Namespace, BucketId, BuildId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuild"sv); +} + +JupiterResult +JupiterSession::FinalizeBuild(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId) +{ + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/{}/finalize", Namespace, BucketId, BuildId)); + return detail::ConvertResponse(Response, "JupiterSession::FinalizeBuild"sv); +} + +PutBuildPartResult +JupiterSession::PutBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + std::string_view PartName, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + + IoHash Hash = IoHash::HashBuffer(Payload.Data(), Payload.Size()); + + HttpClient::Response Response = + m_HttpClient.Put(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/{}", Namespace, BucketId, BuildId, PartId, PartName), + Payload, + {{"X-Jupiter-IoHash", Hash.ToHexString()}}); + + PutBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::PutBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + Result.RawHash = Hash; + } + return Result; +} + +JupiterResult +JupiterSession::GetBuildPart(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildPart"sv); +} + +JupiterResult +JupiterSession::PutBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + ZenContentType ContentType, + const CompositeBuffer& Payload) +{ + HttpClient::Response Response = m_HttpClient.Upload( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload, + ContentType); + return detail::ConvertResponse(Response, "JupiterSession::PutBuildBlob"sv); +} + +JupiterResult +JupiterSession::GetBuildBlob(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + std::filesystem::path TempFolderPath) +{ + HttpClient::Response Response = m_HttpClient.Download( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blobs/{}", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + TempFolderPath); + return detail::ConvertResponse(Response, "JupiterSession::GetBuildBlob"sv); +} + +JupiterResult +JupiterSession::PutBlockMetadata(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& Hash, + const IoBuffer& Payload) +{ + ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); + HttpClient::Response Response = m_HttpClient.Put( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/{}/metadata", Namespace, BucketId, BuildId, PartId, Hash.ToHexString()), + Payload); + return detail::ConvertResponse(Response, "JupiterSession::PutBlockMetadata"sv); +} + +FinalizeBuildPartResult +JupiterSession::FinalizeBuildPart(std::string_view Namespace, + std::string_view BucketId, + const Oid& BuildId, + const Oid& PartId, + const IoHash& RawHash) +{ + HttpClient::Response Response = m_HttpClient.Post( + fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/finalize/{}", Namespace, BucketId, BuildId, PartId, RawHash.ToHexString()), + HttpClient::Accept(ZenContentType::kCbObject)); + + FinalizeBuildPartResult Result = {detail::ConvertResponse(Response, "JupiterSession::FinalizeBuildPart"sv)}; + if (Result.Success) + { + std::string JsonError; + json11::Json Json = json11::Json::parse(Response.ToText(), JsonError); + if (JsonError.empty()) + { + json11::Json::array Needs = Json["needs"].array_items(); + for (const auto& Need : Needs) + { + Result.Needs.emplace_back(IoHash::FromHexString(Need.string_value())); + } + } + } + return Result; +} + +JupiterResult +JupiterSession::FindBlocks(std::string_view Namespace, std::string_view BucketId, const Oid& BuildId, const Oid& PartId) +{ + HttpClient::Response Response = + m_HttpClient.Get(fmt::format("/api/v2/builds/{}/{}/{}/parts/{}/blocks/listBlocks", Namespace, BucketId, BuildId, PartId), + HttpClient::Accept(ZenContentType::kCbObject)); + return detail::ConvertResponse(Response, "JupiterSession::FindBlocks"sv); +} + +} // namespace zen diff --git a/src/zenutil/xmake.lua b/src/zenutil/xmake.lua index 744dff737..3d95651f2 100644 --- a/src/zenutil/xmake.lua +++ b/src/zenutil/xmake.lua @@ -6,5 +6,5 @@ target('zenutil') add_headerfiles("**.h") add_files("**.cpp") add_includedirs("include", {public=true}) - add_deps("zencore") + add_deps("zencore", "zenhttp") add_packages("vcpkg::robin-map", "vcpkg::spdlog") diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 214737425..b36f11741 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -2,6 +2,7 @@ #include "zenutil/zenserverprocess.h" +#include <zencore/basicfile.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/except.h> @@ -12,7 +13,6 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/timer.h> -#include <zenutil/basicfile.h> #include <atomic> diff --git a/src/zenutil/zenutil.cpp b/src/zenutil/zenutil.cpp index 97ebeb01d..c54144549 100644 --- a/src/zenutil/zenutil.cpp +++ b/src/zenutil/zenutil.cpp @@ -4,20 +4,16 @@ #if ZEN_WITH_TESTS -# include <zenutil/basicfile.h> # include <zenutil/cache/cacherequests.h> # include <zenutil/cache/rpcrecording.h> -# include <zenutil/packageformat.h> namespace zen { void zenutil_forcelinktests() { - basicfile_forcelink(); cachepolicy_forcelink(); cache::rpcrecord_forcelink(); - forcelink_packageformat(); cacherequests_forcelink(); } |