diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/workspaces_cmd.cpp | 523 | ||||
| -rw-r--r-- | src/zen/cmds/workspaces_cmd.h | 91 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 128 | ||||
| -rw-r--r-- | src/zen/zen.h | 7 | ||||
| -rw-r--r-- | src/zencore/include/zencore/uid.h | 2 | ||||
| -rw-r--r-- | src/zencore/iobuffer.cpp | 12 | ||||
| -rw-r--r-- | src/zencore/uid.cpp | 28 | ||||
| -rw-r--r-- | src/zenhttp/httpclient.cpp | 19 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpclient.h | 1 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 389 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 10 | ||||
| -rw-r--r-- | src/zenserver/config.h | 6 | ||||
| -rw-r--r-- | src/zenserver/workspaces/httpworkspaces.cpp | 802 | ||||
| -rw-r--r-- | src/zenserver/workspaces/httpworkspaces.h | 72 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 15 | ||||
| -rw-r--r-- | src/zenserver/zenserver.h | 4 | ||||
| -rw-r--r-- | src/zenstore/include/zenstore/workspaces.h | 102 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 955 | ||||
| -rw-r--r-- | src/zenstore/zenstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenutil/chunkrequests.cpp | 147 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkrequests.h | 27 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 6 |
22 files changed, 3299 insertions, 49 deletions
diff --git a/src/zen/cmds/workspaces_cmd.cpp b/src/zen/cmds/workspaces_cmd.cpp new file mode 100644 index 000000000..503bc24cf --- /dev/null +++ b/src/zen/cmds/workspaces_cmd.cpp @@ -0,0 +1,523 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "workspaces_cmd.h" + +#include <zencore/except.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/uid.h> +#include <zenhttp/formatters.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/httpcommon.h> +#include <zenutil/chunkrequests.h> +#include <zenutil/zenserverprocess.h> + +#include <memory> + +namespace zen { + +WorkspaceCommand::WorkspaceCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "v", "verb", "Verb for workspace - create, remove, info", cxxopts::value(m_Verb), "<verb>"); + m_Options.parse_positional({"verb"}); + m_Options.positional_help("verb"); + + m_CreateOptions.add_options()("h,help", "Print help"); + m_CreateOptions.add_option("", "w", "workspace", "Workspace identity(id)", cxxopts::value(m_Id), "<workspaceid>"); + m_CreateOptions.add_option("", "r", "folder", "Root file system folder for workspace", cxxopts::value(m_Path), "<folder>"); + m_CreateOptions.parse_positional({"folder", "workspace"}); + m_CreateOptions.positional_help("folder workspace"); + + m_InfoOptions.add_options()("h,help", "Print help"); + m_InfoOptions.add_option("", "w", "workspace", "Workspace identity(id)", cxxopts::value(m_Id), "<workspaceid>"); + m_InfoOptions.parse_positional({"workspace"}); + m_InfoOptions.positional_help("workspace"); + + m_RemoveOptions.add_options()("h,help", "Print help"); + m_RemoveOptions.add_option("", "w", "workspace", "Workspace identity(id)", cxxopts::value(m_Id), "<workspaceid>"); + m_RemoveOptions.parse_positional({"workspace"}); + m_InfoOptions.positional_help("workspace"); +} + +WorkspaceCommand::~WorkspaceCommand() = default; + +int +WorkspaceCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + std::vector<char*> SubCommandArguments; + cxxopts::Options* SubOption = nullptr; + int ParentCommandArgCount = GetSubCommand(m_Options, argc, argv, m_SubCommands, SubOption, SubCommandArguments); + if (!ParseOptions(ParentCommandArgCount, argv)) + { + return 0; + } + + if (SubOption == nullptr) + { + throw zen::OptionParseException("command verb is missing"); + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw zen::OptionParseException("unable to resolve server specification"); + } + + if (!ParseOptions(*SubOption, gsl::narrow<int>(SubCommandArguments.size()), SubCommandArguments.data())) + { + return 0; + } + + HttpClient Http(m_HostName); + + if (SubOption == &m_CreateOptions) + { + if (m_Path.empty()) + { + throw zen::OptionParseException(fmt::format("path is required\n{}", m_CreateOptions.help())); + } + if (m_Id.empty()) + { + m_Id = Oid::Zero.ToString(); + ZEN_CONSOLE("Using generated workspace id from path '{}'", m_Path); + } + + HttpClient::KeyValueMap Params{{"root_path", m_Path}}; + if (HttpClient::Response Result = Http.Put(fmt::format("/ws/{}", m_Id), Params)) + { + ZEN_CONSOLE("{}. Id: {}", Result, Result.AsText()); + return 0; + } + else + { + Result.ThrowError(fmt::format("failed to create workspace {}", m_Id)); + return 1; + } + } + + if (SubOption == &m_InfoOptions) + { + if (m_Id.empty()) + { + throw zen::OptionParseException(fmt::format("id is required", m_InfoOptions.help())); + } + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}", m_Id))) + { + ZEN_CONSOLE("{}", Result.ToText()); + return 0; + } + else + { + Result.ThrowError(fmt::format("failed to get info for workspace {}", m_Id)); + return 1; + } + } + + if (SubOption == &m_RemoveOptions) + { + if (m_Id.empty()) + { + throw zen::OptionParseException(fmt::format("id is required", m_RemoveOptions.help())); + } + if (HttpClient::Response Result = Http.Delete(fmt::format("/ws/{}", m_Id))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError(fmt::format("failed to remove workspace {}", m_Id)); + return 1; + } + } + + ZEN_ASSERT(false); +} + +///////////////////////////////////////////////////////////////////////// + +WorkspaceShareCommand::WorkspaceShareCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + m_Options.add_option("", "v", "verb", "Verb for workspace - create, remove, info", cxxopts::value(m_Verb), "<verb>"); + m_Options.parse_positional({"verb"}); + m_Options.positional_help("verb"); + + m_CreateOptions.add_options()("h,help", "Print help"); + m_CreateOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_CreateOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_CreateOptions.add_option("", "r", "folder", "Folder path inside the workspace to share", cxxopts::value(m_SharePath), "<folder>"); + m_CreateOptions.parse_positional({"workspace", "folder", "share"}); + m_CreateOptions.positional_help("workspace folder share"); + + m_InfoOptions.add_options()("h,help", "Print help"); + m_InfoOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_InfoOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_InfoOptions.add_option("", "r", "refresh", "Refresh workspace share", cxxopts::value(m_Refresh), "<refresh>"); + m_InfoOptions.parse_positional({"workspace", "share"}); + m_InfoOptions.positional_help("workspace share"); + + m_RemoveOptions.add_options()("h,help", "Print help"); + m_RemoveOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_RemoveOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_RemoveOptions.parse_positional({"workspace", "share"}); + m_RemoveOptions.positional_help("workspace share"); + + m_FilesOptions.add_options()("h,help", "Print help"); + m_FilesOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_FilesOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_FilesOptions.add_option("", + "", + "filter", + "A list of comma separated fields to include in the response - empty means all", + cxxopts::value(m_FieldFilter), + "<fields>"); + m_FilesOptions.add_option("", "r", "refresh", "Refresh workspace share", cxxopts::value(m_Refresh), "<refresh>"); + m_FilesOptions.parse_positional({"workspace", "share"}); + m_FilesOptions.positional_help("workspace share"); + + m_EntriesOptions.add_options()("h,help", "Print help"); + m_EntriesOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_EntriesOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_EntriesOptions.add_option("", + "", + "filter", + "A list of comma separated fields to include in the response - empty means all", + cxxopts::value(m_FieldFilter), + "<fields>"); + m_EntriesOptions.add_option("", "", "opkey", "Filter the query to a particular key (id)", cxxopts::value(m_ChunkId), "<oid>"); + m_EntriesOptions.add_option("", "r", "refresh", "Refresh workspace share", cxxopts::value(m_Refresh), "<refresh>"); + m_EntriesOptions.parse_positional({"workspace", "share", "opkey"}); + m_EntriesOptions.positional_help("workspace share opkey"); + + m_GetChunkOptions.add_options()("h,help", "Print help"); + m_GetChunkOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_GetChunkOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_GetChunkOptions.add_option("", "c", "chunk", "Chunk identity (id)", cxxopts::value(m_ChunkId), "<chunkid>"); + m_GetChunkOptions.add_option("", "", "offset", "Offset in chunk", cxxopts::value(m_Offset), "<offset>"); + m_GetChunkOptions.add_option("", "", "size", "Size of chunk", cxxopts::value(m_Size), "<size>"); + m_GetChunkOptions.parse_positional({"workspace", "share", "chunk"}); + m_GetChunkOptions.positional_help("workspace share chunk"); + + m_GetChunkBatchOptions.add_options()("h,help", "Print help"); + m_GetChunkBatchOptions.add_option("", "s", "share", "Workspace share identity(id)", cxxopts::value(m_ShareId), "<shareid>"); + m_GetChunkBatchOptions.add_option("", "w", "workspace", "Workspace identity (id)", cxxopts::value(m_WorkspaceId), "<workspaceid>"); + m_GetChunkBatchOptions.add_option("", "", "chunks", "A list of identities (id)", cxxopts::value(m_ChunkIds), "<chunkids>"); + m_GetChunkBatchOptions.parse_positional({"workspace", "share", "chunks"}); + m_GetChunkBatchOptions.positional_help("workspace share chunks"); +} + +WorkspaceShareCommand::~WorkspaceShareCommand() = default; + +int +WorkspaceShareCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions); + + using namespace std::literals; + + std::vector<char*> SubCommandArguments; + cxxopts::Options* SubOption = nullptr; + int ParentCommandArgCount = GetSubCommand(m_Options, argc, argv, m_SubCommands, SubOption, SubCommandArguments); + if (!ParseOptions(ParentCommandArgCount, argv)) + { + return 0; + } + + if (SubOption == nullptr) + { + throw zen::OptionParseException("command verb is missing"); + } + + m_HostName = ResolveTargetHostSpec(m_HostName); + + if (m_HostName.empty()) + { + throw zen::OptionParseException("unable to resolve server specification"); + } + + if (!ParseOptions(*SubOption, gsl::narrow<int>(SubCommandArguments.size()), SubCommandArguments.data())) + { + return 0; + } + + if (m_WorkspaceId.empty()) + { + throw zen::OptionParseException("workspace id is required"); + } + + HttpClient Http(m_HostName); + + if (SubOption == &m_CreateOptions) + { + if (m_ShareId.empty()) + { + if (m_SharePath.ends_with(std::filesystem::path::preferred_separator)) + { + m_SharePath.pop_back(); + } + + m_ShareId = Oid::Zero.ToString(); + ZEN_CONSOLE("Using generated share id for path '{}'", m_SharePath); + } + + HttpClient::KeyValueMap Params{{"share_path", m_SharePath}}; + + if (HttpClient::Response Result = Http.Put(fmt::format("/ws/{}/{}", m_WorkspaceId, m_ShareId), Params)) + { + ZEN_CONSOLE("{}. Id: {}", Result, Result.AsText()); + return 0; + } + else + { + Result.ThrowError("failed to create workspace share"sv); + return 1; + } + } + + if (SubOption == &m_InfoOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}/{}", m_WorkspaceId, m_ShareId))) + { + ZEN_CONSOLE("{}", Result.ToText()); + return 0; + } + else + { + Result.ThrowError(fmt::format("failed to get info for share {} in workspace {}", m_ShareId, m_WorkspaceId)); + return 1; + } + } + + if (SubOption == &m_RemoveOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + if (HttpClient::Response Result = Http.Delete(fmt::format("/ws/{}/{}", m_WorkspaceId, m_ShareId))) + { + ZEN_CONSOLE("{}", Result); + return 0; + } + else + { + Result.ThrowError(fmt::format("failed to remove share {} in workspace {}", m_WorkspaceId, m_ShareId)); + return 1; + } + } + + if (SubOption == &m_FilesOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + + HttpClient::KeyValueMap Params; + if (!m_FieldFilter.empty()) + { + Params.Entries.insert_or_assign("fieldnames", m_FieldFilter); + } + if (m_Refresh) + { + Params.Entries.insert_or_assign("refresh", ToString(m_Refresh)); + } + + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}/{}/files", m_WorkspaceId, m_ShareId), {}, Params)) + { + ZEN_CONSOLE("{}: {}", Result, Result.ToText()); + return 0; + } + else + { + Result.ThrowError("failed to get workspace share files"sv); + return 1; + } + } + + if (SubOption == &m_EntriesOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + + HttpClient::KeyValueMap Params; + if (!m_ChunkId.empty()) + { + Params.Entries.insert_or_assign("opkey", m_ChunkId); + } + if (!m_FieldFilter.empty()) + { + Params.Entries.insert_or_assign("fieldfilter", m_FieldFilter); + } + if (m_Refresh) + { + Params.Entries.insert_or_assign("refresh", ToString(m_Refresh)); + } + + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}/{}/entries", m_WorkspaceId, m_ShareId), {}, Params)) + { + ZEN_CONSOLE("{}: {}", Result, Result.ToText()); + return 0; + } + else + { + Result.ThrowError("failed to get workspace share entries"sv); + return 1; + } + } + + auto ChunksToOidStrings = + [&Http, WorkspaceId = m_WorkspaceId, ShareId = m_ShareId](std::span<const std::string> ChunkIds) -> std::vector<std::string> { + std::vector<std::string> Oids; + Oids.reserve(ChunkIds.size()); + std::vector<size_t> NeedsConvertIndexes; + for (const std::string& StringChunkId : ChunkIds) + { + Oid ChunkId = Oid::TryFromHexString(StringChunkId); + if (ChunkId == Oid::Zero) + { + NeedsConvertIndexes.push_back(Oids.size()); + } + Oids.push_back(ChunkId.ToString()); + } + if (!NeedsConvertIndexes.empty()) + { + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId), + {}, + HttpClient::KeyValueMap{{"fieldnames", "id,clientpath"}})) + { + std::unordered_map<std::string, Oid> PathToOid; + for (CbFieldView EntryView : Result.AsObject()["files"sv]) + { + CbObjectView Entry = EntryView.AsObjectView(); + PathToOid[std::string(Entry["clientpath"sv].AsString())] = Entry["id"sv].AsObjectId(); + } + for (size_t PathIndex : NeedsConvertIndexes) + { + if (auto It = PathToOid.find(ChunkIds[PathIndex]); It != PathToOid.end()) + { + Oids[PathIndex] = It->second.ToString(); + ZEN_CONSOLE("Converted path '{}' to id '{}'", ChunkIds[PathIndex], Oids[PathIndex]); + } + else + { + Result.ThrowError( + fmt::format("unable to resolve path {} workspace {}, share {}"sv, ChunkIds[PathIndex], WorkspaceId, ShareId)); + } + } + } + else + { + Result.ThrowError("failed to get workspace share file list to resolve paths"sv); + } + } + return Oids; + }; + + if (SubOption == &m_GetChunkOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + + if (m_ChunkId.empty()) + { + throw zen::OptionParseException("chunk id is required"); + } + + m_ChunkId = ChunksToOidStrings(std::vector<std::string>{m_ChunkId})[0]; + + HttpClient::KeyValueMap Params; + if (m_Offset != 0) + { + Params.Entries.insert_or_assign("offset", fmt::format("{}", m_Offset)); + } + if (m_Size != ~uint64_t(0)) + { + Params.Entries.insert_or_assign("size", fmt::format("{}", m_Size)); + } + + if (HttpClient::Response Result = Http.Get(fmt::format("/ws/{}/{}/{}", m_WorkspaceId, m_ShareId, m_ChunkId), {}, Params)) + { + ZEN_CONSOLE("{}: Bytes: {}", Result, NiceBytes(Result.ResponsePayload.GetSize())); + return 0; + } + else + { + Result.ThrowError("failed to get workspace share chunk"sv); + return 1; + } + } + + if (SubOption == &m_GetChunkBatchOptions) + { + if (m_ShareId.empty()) + { + throw zen::OptionParseException(fmt::format("share id is required", m_InfoOptions.help())); + } + + if (m_ChunkIds.empty()) + { + throw zen::OptionParseException("share is is required"); + } + + m_ChunkIds = ChunksToOidStrings(m_ChunkIds); + + std::vector<RequestChunkEntry> ChunkRequests; + ChunkRequests.resize(m_ChunkIds.size()); + for (size_t Index = 0; Index < m_ChunkIds.size(); Index++) + { + ChunkRequests[Index] = RequestChunkEntry{.ChunkId = Oid::FromHexString(m_ChunkIds[Index]), + .CorrelationId = gsl::narrow<uint32_t>(Index), + .Offset = 0, + .RequestBytes = uint64_t(-1)}; + } + IoBuffer Payload = BuildChunkBatchRequest(ChunkRequests); + + if (HttpClient::Response Result = Http.Post(fmt::format("/ws/{}/{}/batch", m_WorkspaceId, m_ShareId), Payload)) + { + ZEN_CONSOLE("{}: Bytes: {}", Result, NiceBytes(Result.ResponsePayload.GetSize())); + std::vector<IoBuffer> Results = ParseChunkBatchResponse(Result.ResponsePayload); + if (Results.size() != m_ChunkIds.size()) + { + throw std::runtime_error( + fmt::format("failed to get workspace share batch - invalid result count recevied (expected: {}, received: {}", + m_ChunkIds.size(), + Results.size())); + } + for (size_t Index = 0; Index < m_ChunkIds.size(); Index++) + { + ZEN_CONSOLE("{}: Bytes: {}", m_ChunkIds[Index], NiceBytes(Results[Index].GetSize())); + } + return 0; + } + else + { + Result.ThrowError("failed to get workspace share batch"sv); + return 1; + } + } + + ZEN_ASSERT(false); +} + +} // namespace zen diff --git a/src/zen/cmds/workspaces_cmd.h b/src/zen/cmds/workspaces_cmd.h new file mode 100644 index 000000000..a2df4b96e --- /dev/null +++ b/src/zen/cmds/workspaces_cmd.h @@ -0,0 +1,91 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +class WorkspaceCommand : public CacheStoreCommand +{ +public: + static constexpr char Name[] = "workspace"; + static constexpr char Description[] = "Manage workspaces - create, remove, info"; + + WorkspaceCommand(); + ~WorkspaceCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_HostName; + + std::string m_Verb; // create, info, remove + + std::string m_Id; + + cxxopts::Options m_CreateOptions{"create", "Create a workspace"}; + std::string m_Path; + + cxxopts::Options m_InfoOptions{"info", "Info about a workspace"}; + + cxxopts::Options m_RemoveOptions{"remove", "Remove a workspace"}; + + cxxopts::Options* m_SubCommands[3] = {&m_CreateOptions, &m_InfoOptions, &m_RemoveOptions}; +}; + +class WorkspaceShareCommand : public CacheStoreCommand +{ +public: + static constexpr char Name[] = "workspace-share"; + static constexpr char Description[] = + "Manage workspace shared folders in a workspace - create, remove, info, files, entries, get, batch"; + + WorkspaceShareCommand(); + ~WorkspaceShareCommand(); + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options& Options() override { return m_Options; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_HostName; + std::string m_WorkspaceId; + std::string m_Verb; // create, info, remove + std::string m_ShareId; + + cxxopts::Options m_CreateOptions{"create", "Create a workspace share"}; + std::string m_SharePath; + + bool m_Refresh = false; + + cxxopts::Options m_InfoOptions{"info", "Info about a workspace share"}; + + cxxopts::Options m_RemoveOptions{"remove", "Remove a workspace share"}; + + std::string m_FieldFilter; + + cxxopts::Options m_FilesOptions{"files", "List files in a workspace share"}; + + std::string m_ChunkId; + + cxxopts::Options m_EntriesOptions{"entries", "List entries in a workspace shared folder"}; + + cxxopts::Options m_GetChunkOptions{"get", "List entries in a workspace shared folder"}; + uint64_t m_Offset = 0; + uint64_t m_Size = ~uint64_t(0); + + cxxopts::Options m_GetChunkBatchOptions{"batch", "Get a batch of chunks from a workspace shared folder"}; + std::vector<std::string> m_ChunkIds; + + cxxopts::Options* m_SubCommands[7] = {&m_CreateOptions, + &m_InfoOptions, + &m_RemoveOptions, + &m_FilesOptions, + &m_EntriesOptions, + &m_GetChunkOptions, + &m_GetChunkBatchOptions}; +}; + +} // namespace zen diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 4881d44ae..6320fcc2f 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -22,6 +22,7 @@ #include "cmds/up_cmd.h" #include "cmds/version_cmd.h" #include "cmds/vfs_cmd.h" +#include "cmds/workspaces_cmd.h" #include <zencore/filesystem.h> #include <zencore/logging.h> @@ -65,7 +66,12 @@ ZenCmdBase::CommandCategory() const bool ZenCmdBase::ParseOptions(int argc, char** argv) { - cxxopts::Options& CmdOptions = Options(); + return ParseOptions(Options(), argc, argv); +} + +bool +ZenCmdBase::ParseOptions(cxxopts::Options& CmdOptions, int argc, char** argv) +{ cxxopts::ParseResult Result; try @@ -106,6 +112,34 @@ ZenCmdBase::ParseOptions(int argc, char** argv) return true; } +// Get the number of args including the sub command +// Build an array for sub command to parse +int +ZenCmdBase::GetSubCommand(cxxopts::Options&, + int argc, + char** argv, + std::span<cxxopts::Options*> SubOptions, + cxxopts::Options*& OutSubOption, + std::vector<char*>& OutSubCommandArguments) +{ + for (int I = 1; I < argc; ++I) + { + if (auto It = std::find_if(SubOptions.begin(), + SubOptions.end(), + [&](cxxopts::Options* SubOption) { return SubOption->program() == argv[I]; }); + It != SubOptions.end()) + { + OutSubOption = (*It); + OutSubCommandArguments.push_back(argv[0]); + std::copy(&argv[I + 1], &argv[argc], std::back_inserter(OutSubCommandArguments)); + return I + 1; + } + } + // No Sub command found + OutSubOption = nullptr; + return argc; +} + std::string ZenCmdBase::FormatHttpResponse(const cpr::Response& Response) { @@ -297,6 +331,8 @@ main(int argc, char** argv) UpCommand UpCmd; VersionCommand VersionCmd; VfsCommand VfsCmd; + WorkspaceCommand WorkspaceCmd; + WorkspaceShareCommand WorkspaceShareCmd; const struct CommandInfo { @@ -305,50 +341,52 @@ main(int argc, char** argv) const char* CmdSummary; } Commands[] = { // clang-format off - {"attach", &AttachCmd, "Add a sponsor process to a running zen service"}, - {"bench", &BenchCmd, "Utility command for benchmarking"}, - {"cache-details", &CacheDetailsCmd, "Details on cache"}, - {"cache-info", &CacheInfoCmd, "Info on cache, namespace or bucket"}, - {"cache-stats", &CacheStatsCmd, "Stats on cache"}, - {"copy", &CopyCmd, "Copy file(s)"}, - {"copy-state", &CopyStateCmd, "Copy zen server disk state"}, - {"dedup", &DedupCmd, "Dedup files"}, - {"down", &DownCmd, "Bring zen server down"}, - {"drop", &DropCmd, "Drop cache namespace or bucket"}, - {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, - {"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"}, - {"gc", &GcCmd, "Garbage collect zen storage"}, - {"info", &InfoCmd, "Show high level Zen server information"}, - {"jobs", &JobCmd, "Show/cancel zen background jobs"}, - {"logs", &LoggingCmd, "Show/control zen logging"}, - {"oplog-create", &CreateOplogCmd, "Create a project oplog"}, - {"oplog-delete", &DeleteOplogCmd, "Delete a project oplog"}, - {"oplog-export", &ExportOplogCmd, "Export project store oplog"}, - {"oplog-import", &ImportOplogCmd, "Import project store oplog"}, - {"oplog-mirror", &OplogMirrorCmd, "Mirror project store oplog to file system"}, - {"oplog-snapshot", &SnapshotOplogCmd, "Snapshot project store oplog"}, - {"print", &PrintCmd, "Print compact binary object"}, - {"printpackage", &PrintPkgCmd, "Print compact binary package"}, - {"project-create", &CreateProjectCmd, "Create a project"}, - {"project-delete", &DeleteProjectCmd, "Delete a project"}, - {"project-details", &ProjectDetailsCmd, "Details on project store"}, - {"project-drop", &ProjectDropCmd, "Drop project or project oplog"}, - {"project-info", &ProjectInfoCmd, "Info on project or project oplog"}, - {"project-stats", &ProjectStatsCmd, "Stats on project store"}, - {"ps", &PsCmd, "Enumerate running zen server instances"}, - {"rpc-record-replay", &RpcReplayCmd, "Replays a previously recorded session of rpc requests"}, - {"rpc-record-start", &RpcStartRecordingCmd, "Starts recording of cache rpc requests on a host"}, - {"rpc-record-stop", &RpcStopRecordingCmd, "Stops recording of cache rpc requests on a host"}, - {"run", &RunCmd, "Run command with special options"}, - {"scrub", &ScrubCmd, "Scrub zen storage (verify data integrity)"}, - {"serve", &ServeCmd, "Serve files from a directory"}, - {"status", &StatusCmd, "Show zen status"}, - {"top", &TopCmd, "Monitor zen server activity"}, - {"trace", &TraceCmd, "Control zen realtime tracing"}, - {"up", &UpCmd, "Bring zen server up"}, - {"version", &VersionCmd, "Get zen server version"}, - {"vfs", &VfsCmd, "Manage virtual file system"}, - {"flush", &FlushCmd, "Flush storage"}, + {"attach", &AttachCmd, "Add a sponsor process to a running zen service"}, + {"bench", &BenchCmd, "Utility command for benchmarking"}, + {"cache-details", &CacheDetailsCmd, "Details on cache"}, + {"cache-info", &CacheInfoCmd, "Info on cache, namespace or bucket"}, + {"cache-stats", &CacheStatsCmd, "Stats on cache"}, + {"copy", &CopyCmd, "Copy file(s)"}, + {"copy-state", &CopyStateCmd, "Copy zen server disk state"}, + {"dedup", &DedupCmd, "Dedup files"}, + {"down", &DownCmd, "Bring zen server down"}, + {"drop", &DropCmd, "Drop cache namespace or bucket"}, + {"gc-status", &GcStatusCmd, "Garbage collect zen storage status check"}, + {"gc-stop", &GcStopCmd, "Request cancel of running garbage collection in zen storage"}, + {"gc", &GcCmd, "Garbage collect zen storage"}, + {"info", &InfoCmd, "Show high level Zen server information"}, + {"jobs", &JobCmd, "Show/cancel zen background jobs"}, + {"logs", &LoggingCmd, "Show/control zen logging"}, + {"oplog-create", &CreateOplogCmd, "Create a project oplog"}, + {"oplog-delete", &DeleteOplogCmd, "Delete a project oplog"}, + {"oplog-export", &ExportOplogCmd, "Export project store oplog"}, + {"oplog-import", &ImportOplogCmd, "Import project store oplog"}, + {"oplog-mirror", &OplogMirrorCmd, "Mirror project store oplog to file system"}, + {"oplog-snapshot", &SnapshotOplogCmd, "Snapshot project store oplog"}, + {"print", &PrintCmd, "Print compact binary object"}, + {"printpackage", &PrintPkgCmd, "Print compact binary package"}, + {"project-create", &CreateProjectCmd, "Create a project"}, + {"project-delete", &DeleteProjectCmd, "Delete a project"}, + {"project-details", &ProjectDetailsCmd, "Details on project store"}, + {"project-drop", &ProjectDropCmd, "Drop project or project oplog"}, + {"project-info", &ProjectInfoCmd, "Info on project or project oplog"}, + {"project-stats", &ProjectStatsCmd, "Stats on project store"}, + {"ps", &PsCmd, "Enumerate running zen server instances"}, + {"rpc-record-replay", &RpcReplayCmd, "Replays a previously recorded session of rpc requests"}, + {"rpc-record-start", &RpcStartRecordingCmd, "Starts recording of cache rpc requests on a host"}, + {"rpc-record-stop", &RpcStopRecordingCmd, "Stops recording of cache rpc requests on a host"}, + {"run", &RunCmd, "Run command with special options"}, + {"scrub", &ScrubCmd, "Scrub zen storage (verify data integrity)"}, + {"serve", &ServeCmd, "Serve files from a directory"}, + {"status", &StatusCmd, "Show zen status"}, + {"top", &TopCmd, "Monitor zen server activity"}, + {"trace", &TraceCmd, "Control zen realtime tracing"}, + {"up", &UpCmd, "Bring zen server up"}, + {"version", &VersionCmd, "Get zen server version"}, + {"vfs", &VfsCmd, "Manage virtual file system"}, + {"flush", &FlushCmd, "Flush storage"}, + {WorkspaceCommand::Name, &WorkspaceCmd, WorkspaceCommand::Description}, + {WorkspaceShareCommand::Name, &WorkspaceShareCmd, WorkspaceShareCommand::Description}, // clang-format on }; diff --git a/src/zen/zen.h b/src/zen/zen.h index 78f22cad6..c26e164f7 100644 --- a/src/zen/zen.h +++ b/src/zen/zen.h @@ -48,6 +48,13 @@ public: virtual ZenCmdCategory& CommandCategory() const; bool ParseOptions(int argc, char** argv); + static bool ParseOptions(cxxopts::Options& Options, int argc, char** argv); + static int GetSubCommand(cxxopts::Options& Options, + int argc, + char** argv, + std::span<cxxopts::Options*> SubOptions, + cxxopts::Options*& OutSubOption, + std::vector<char*>& OutSubCommandArguments); static std::string FormatHttpResponse(const cpr::Response& Response); static int MapHttpToCommandReturnCode(const cpr::Response& Response); static std::string ResolveTargetHostSpec(const std::string& InHostSpec); diff --git a/src/zencore/include/zencore/uid.h b/src/zencore/include/zencore/uid.h index 3abec9d16..f8b1ccf98 100644 --- a/src/zencore/include/zencore/uid.h +++ b/src/zencore/include/zencore/uid.h @@ -61,8 +61,10 @@ struct Oid const Oid& Generate(); [[nodiscard]] static Oid FromHexString(const std::string_view String); + [[nodiscard]] static Oid TryFromHexString(const std::string_view String, const Oid& Default = Oid::Zero); StringBuilderBase& ToString(StringBuilderBase& OutString) const; void ToString(char OutString[StringLength]) const; + std::string ToString() const; [[nodiscard]] static Oid FromMemory(const void* Ptr); auto operator<=>(const Oid& rhs) const = default; diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index fd99a01cb..33762bdb7 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -30,6 +30,10 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <unistd.h> #endif +#if ZEN_WITH_TESTS +# include <zencore/testutils.h> +#endif + #include <gsl/gsl-lite.hpp> namespace zen { @@ -723,14 +727,16 @@ TEST_CASE("IoBuffer") TEST_CASE("IoBuffer.mmap") { + zen::ScopedTemporaryDirectory TempDir; + zen::IoBuffer Buffer1{65536}; uint8_t* Mutate = Buffer1.MutableData<uint8_t>(); memcpy(Mutate, "abc123", 6); - zen::WriteFile("test_file.data", Buffer1); + zen::WriteFile(TempDir.Path() / "test_file.data", Buffer1); SUBCASE("in-range") { - zen::IoBuffer FileBuffer = IoBufferBuilder::MakeFromFile("test_file.data", 0, 65536); + zen::IoBuffer FileBuffer = IoBufferBuilder::MakeFromFile(TempDir.Path() / "test_file.data", 0, 65536); const void* Data = FileBuffer.GetData(); CHECK(Data != nullptr); CHECK_EQ(memcmp(Data, "abc123", 6), 0); @@ -741,7 +747,7 @@ TEST_CASE("IoBuffer.mmap") # if ZEN_PLATFORM_WINDOWS SUBCASE("out-of-range") { - zen::IoBuffer FileBuffer = IoBufferBuilder::MakeFromFile("test_file.data", 131072, 65536); + zen::IoBuffer FileBuffer = IoBufferBuilder::MakeFromFile(TempDir.Path() / "test_file.data", 131072, 65536); const void* Data = nullptr; CHECK_THROWS(Data = FileBuffer.GetData()); CHECK(Data == nullptr); diff --git a/src/zencore/uid.cpp b/src/zencore/uid.cpp index 0f04d70ac..8ef660c7a 100644 --- a/src/zencore/uid.cpp +++ b/src/zencore/uid.cpp @@ -83,6 +83,26 @@ Oid::FromHexString(const std::string_view String) } Oid +Oid::TryFromHexString(const std::string_view String, const Oid& Default) +{ + if (String.length() != StringLength) + { + return Default; + } + + Oid Id; + + if (ParseHexBytes(String.data(), String.size(), reinterpret_cast<uint8_t*>(Id.OidBits))) + { + return Id; + } + else + { + return Default; + } +} + +Oid Oid::FromMemory(const void* Ptr) { Oid Id; @@ -97,6 +117,14 @@ Oid::ToString(char OutString[StringLength]) const OutString[StringLength] = '\0'; } +std::string +Oid::ToString() const +{ + char OutString[StringLength + 1]; + ToString(OutString); + return std::string(OutString, StringLength); +} + StringBuilderBase& Oid::ToString(StringBuilderBase& OutString) const { diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index 81c9064f6..1874b34eb 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -717,6 +717,25 @@ HttpClient::Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap } HttpClient::Response +HttpClient::Put(std::string_view Url, const KeyValueMap& Parameters) +{ + ZEN_TRACE_CPU("HttpClient::Put"); + + return CommonResponse(DoWithRetry( + [&]() { + Impl::Session Sess = m_Impl->AllocSession(m_BaseUri, + Url, + m_ConnectionSettings, + {{"Content-Length", "0"}}, + Parameters, + m_SessionId, + GetAccessToken()); + return Sess.Put(); + }, + m_ConnectionSettings.RetryCount)); +} + +HttpClient::Response HttpClient::Get(std::string_view Url, const KeyValueMap& AdditionalHeader, const KeyValueMap& Parameters) { ZEN_TRACE_CPU("HttpClient::Get"); diff --git a/src/zenhttp/include/zenhttp/httpclient.h b/src/zenhttp/include/zenhttp/httpclient.h index 8318e3679..1cf77d794 100644 --- a/src/zenhttp/include/zenhttp/httpclient.h +++ b/src/zenhttp/include/zenhttp/httpclient.h @@ -144,6 +144,7 @@ public: }; [[nodiscard]] Response Put(std::string_view Url, const IoBuffer& Payload, const KeyValueMap& AdditionalHeader = {}); + [[nodiscard]] Response Put(std::string_view Url, const KeyValueMap& Parameters = {}); [[nodiscard]] Response Get(std::string_view Url, const KeyValueMap& AdditionalHeader = {}, const KeyValueMap& Parameters = {}); [[nodiscard]] Response Head(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); [[nodiscard]] Response Delete(std::string_view Url, const KeyValueMap& AdditionalHeader = {}); diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 4675ede38..15f863002 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -23,6 +23,7 @@ #include <zenhttp/zenhttp.h> #include <zenutil/cache/cache.h> #include <zenutil/cache/cacherequests.h> +#include <zenutil/chunkrequests.h> #include <zenutil/logging/testformatter.h> #include <zenutil/packageformat.h> #include <zenutil/zenserverprocess.h> @@ -3196,6 +3197,394 @@ TEST_CASE("project.remote") } } +std::vector<std::pair<std::filesystem::path, IoBuffer>> +GenerateFolderContent(const std::filesystem::path& RootPath) +{ + CreateDirectories(RootPath); + std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; + Result.push_back(std::make_pair(RootPath / "root_blob_1.bin", CreateRandomBlob(4122))); + Result.push_back(std::make_pair(RootPath / "root_blob_2.bin", CreateRandomBlob(2122))); + + std::filesystem::path EmptyFolder(RootPath / "empty_folder"); + + std::filesystem::path FirstFolder(RootPath / "first_folder"); + std::filesystem::create_directory(FirstFolder); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob1.bin", CreateRandomBlob(22))); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob2.bin", CreateRandomBlob(122))); + + std::filesystem::path SecondFolder(RootPath / "second_folder"); + std::filesystem::create_directory(SecondFolder); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob1.bin", CreateRandomBlob(522))); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob2.bin", CreateRandomBlob(122))); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob3.bin", CreateRandomBlob(225))); + + std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); + std::filesystem::create_directory(SecondFolderChild); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob1.bin", CreateRandomBlob(622))); + + for (const auto& It : Result) + { + WriteFile(It.first, It.second); + } + + return Result; +} + +std::vector<std::pair<std::filesystem::path, IoBuffer>> +GenerateFolderContent2(const std::filesystem::path& RootPath) +{ + std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; + Result.push_back(std::make_pair(RootPath / "root_blob_3.bin", CreateRandomBlob(312))); + std::filesystem::path FirstFolder(RootPath / "first_folder"); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob3.bin", CreateRandomBlob(722))); + std::filesystem::path SecondFolder(RootPath / "second_folder"); + std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob2.bin", CreateRandomBlob(962))); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob3.bin", CreateRandomBlob(561))); + + for (const auto& It : Result) + { + WriteFile(It.first, It.second); + } + + return Result; +} + +TEST_CASE("workspaces.create") +{ + using namespace std::literals; + + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + ZenServerInstance Instance(TestEnv); + Instance.SetTestDir(TestDir); + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path Root1Path = TempDir.Path() / "root1"; + std::filesystem::path Root2Path = TempDir.Path() / "root2"; + std::filesystem::path Share1Path = "shared_1"; + std::filesystem::path Share2Path = "shared_2"; + CreateDirectories(Share1Path); + CreateDirectories(Share2Path); + + Oid Root1Id = Oid::Zero; + Oid Root2Id = Oid::NewOid(); + + HttpClient Client(Instance.GetBaseUri()); + + CHECK(Client.Put(fmt::format("/ws/{}", Root1Id)).StatusCode == HttpResponseCode::BadRequest); + + if (HttpClient::Response Root1Response = + Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); + Root1Response.StatusCode == HttpResponseCode::Created) + { + Root1Id = Oid::TryFromHexString(Root1Response.AsText()); + CHECK(Root1Id != Oid::Zero); + } + else + { + CHECK(false); + } + if (HttpClient::Response Root1Response = + Client.Put(fmt::format("/ws/{}", Oid::Zero), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); + Root1Response.StatusCode == HttpResponseCode::OK) + { + CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText())); + } + else + { + CHECK(false); + } + if (HttpClient::Response Root1Response = + Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); + Root1Response.StatusCode == HttpResponseCode::OK) + { + CHECK(Root1Id == Oid::TryFromHexString(Root1Response.AsText())); + } + else + { + CHECK(false); + } + CHECK(Client.Put(fmt::format("/ws/{}", Root1Id), HttpClient::KeyValueMap{{"root_path", Root2Path.string()}}).StatusCode == + HttpResponseCode::Conflict); + + CHECK( + Client.Put(fmt::format("/ws/{}/{}", Root1Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == + HttpResponseCode::Created); + + CHECK( + Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == + HttpResponseCode::NotFound); + + if (HttpClient::Response Root2Response = + Client.Put(fmt::format("/ws/{}", Root2Id), HttpClient::KeyValueMap{{"root_path", Root1Path.string()}}); + Root2Response.StatusCode == HttpResponseCode::Created) + { + CHECK(Root2Id == Oid::TryFromHexString(Root2Response.AsText())); + } + else + { + CHECK(false); + } + + CHECK(Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero)).StatusCode == HttpResponseCode::BadRequest); + + Oid Share2Id = Oid::Zero; + if (HttpClient::Response Share2Response = + Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}); + Share2Response.StatusCode == HttpResponseCode::Created) + { + Share2Id = Oid::TryFromHexString(Share2Response.AsText()); + CHECK(Share2Id != Oid::Zero); + } + + CHECK( + Client.Put(fmt::format("/ws/{}/{}", Root2Id, Oid::Zero), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == + HttpResponseCode::OK); + + CHECK( + Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share2Path.string()}}).StatusCode == + HttpResponseCode::OK); + + CHECK( + Client.Put(fmt::format("/ws/{}/{}", Root2Id, Share2Id), HttpClient::KeyValueMap{{"share_path", Share1Path.string()}}).StatusCode == + HttpResponseCode::Conflict); +} + +TEST_CASE("workspaces.lifetimes") +{ + using namespace std::literals; + + std::filesystem::path SystemRootPath = TestEnv.CreateNewTestDir(); + + Oid WorkspaceId = Oid::NewOid(); + Oid ShareId = Oid::NewOid(); + + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + ZenServerInstance Instance(TestEnv); + Instance.SetTestDir(TestDir); + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path RootPath = TempDir.Path(); + std::filesystem::path SharePath = RootPath / "shared_folder"; + CreateDirectories(SharePath); + + HttpClient Client(Instance.GetBaseUri()); + CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == + HttpResponseCode::Created); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); + CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == + HttpResponseCode::OK); + + CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", SharePath.string()}}) + .StatusCode == HttpResponseCode::Created); + CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); + CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", SharePath.string()}}) + .StatusCode == HttpResponseCode::OK); + } + + // Restart + + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + ZenServerInstance Instance(TestEnv); + Instance.SetTestDir(TestDir); + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); + + CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); + } + + // Wipe system config + std::filesystem::remove_all(SystemRootPath); + + // Restart + + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + ZenServerInstance Instance(TestEnv); + Instance.SetTestDir(TestDir); + const uint16_t PortNumber = + Instance.SpawnServerAndWaitUntilReady(fmt::format("--workspaces-enabled --system-dir {}", SystemRootPath)); + CHECK(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri()); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); + CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound); + } +} + +TEST_CASE("workspaces.share") +{ + ZenServerInstance Instance(TestEnv); + + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--workspaces-enabled"); + CHECK(PortNumber != 0); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path RootPath = TempDir.Path(); + std::filesystem::path SharePath = RootPath / "shared_folder"; + GenerateFolderContent(SharePath); + + HttpClient Client(Instance.GetBaseUri()); + + Oid WorkspaceId = Oid::NewOid(); + CHECK(Client.Put(fmt::format("/ws/{}", WorkspaceId), HttpClient::KeyValueMap{{"root_path", RootPath.string()}}).StatusCode == + HttpResponseCode::Created); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).AsObject()["id"sv].AsObjectId() == WorkspaceId); + + Oid ShareId = Oid::NewOid(); + CHECK(Client.Put(fmt::format("/ws/{}/{}", WorkspaceId, ShareId), HttpClient::KeyValueMap{{"share_path", SharePath.string()}}) + .StatusCode == HttpResponseCode::Created); + CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).AsObject()["id"sv].AsObjectId() == ShareId); + + CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8); + GenerateFolderContent2(SharePath); + CHECK(Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId)).AsObject()["files"sv].AsArrayView().Num() == 8); + HttpClient::Response FilesResponse = + Client.Get(fmt::format("/ws/{}/{}/files", WorkspaceId, ShareId), + {}, + HttpClient::KeyValueMap{{"refresh", ToString(true)}, {"fieldnames", "id,clientpath,size"}}); + CHECK(FilesResponse); + std::unordered_map<Oid, std::pair<std::filesystem::path, uint64_t>, Oid::Hasher> Files; + { + CbArrayView FilesArray = FilesResponse.AsObject()["files"sv].AsArrayView(); + CHECK(FilesArray.Num() == 12); + for (CbFieldView Field : FilesArray) + { + CbObjectView FileObject = Field.AsObjectView(); + Oid ChunkId = FileObject["id"sv].AsObjectId(); + CHECK(ChunkId != Oid::Zero); + uint64_t Size = FileObject["size"sv].AsUInt64(); + std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); + std::filesystem::path AbsFilePath = SharePath / Path; + CHECK(std::filesystem::is_regular_file(AbsFilePath)); + CHECK(std::filesystem::file_size(AbsFilePath) == Size); + Files.insert_or_assign(ChunkId, std::make_pair(AbsFilePath, Size)); + } + } + + HttpClient::Response EntriesResponse = + Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId), {}, HttpClient::KeyValueMap{{"fieldfilter", "id,clientpath"}}); + CHECK(EntriesResponse); + { + CbArrayView EntriesArray = EntriesResponse.AsObject()["entries"sv].AsArrayView(); + CHECK(EntriesArray.Num() == 1); + for (CbFieldView EntryField : EntriesArray) + { + CbObjectView EntryObject = EntryField.AsObjectView(); + CbArrayView FilesArray = EntryObject["files"sv].AsArrayView(); + CHECK(FilesArray.Num() == 12); + for (CbFieldView FileField : FilesArray) + { + CbObjectView FileObject = FileField.AsObjectView(); + Oid ChunkId = FileObject["id"sv].AsObjectId(); + CHECK(ChunkId != Oid::Zero); + std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); + std::filesystem::path AbsFilePath = SharePath / Path; + CHECK(std::filesystem::is_regular_file(AbsFilePath)); + } + } + } + + HttpClient::Response FileManifestResponse = + Client.Get(fmt::format("/ws/{}/{}/entries", WorkspaceId, ShareId), + {}, + HttpClient::KeyValueMap{{"opkey", "file_manifest"}, {"fieldfilter", "id,clientpath"}}); + CHECK(FileManifestResponse); + { + CbArrayView EntriesArray = FileManifestResponse.AsObject()["entry"sv].AsObjectView()["files"sv].AsArrayView(); + CHECK(EntriesArray.Num() == 12); + for (CbFieldView Field : EntriesArray) + { + CbObjectView FileObject = Field.AsObjectView(); + Oid ChunkId = FileObject["id"sv].AsObjectId(); + CHECK(ChunkId != Oid::Zero); + std::u8string_view Path = FileObject["clientpath"sv].AsU8String(); + std::filesystem::path AbsFilePath = SharePath / Path; + CHECK(std::filesystem::is_regular_file(AbsFilePath)); + } + } + + for (auto It : Files) + { + const Oid& ChunkId = It.first; + const std::filesystem::path& Path = It.second.first; + const uint64_t Size = It.second.second; + + CHECK(Client.Get(fmt::format("/ws/{}/{}/{}/info", WorkspaceId, ShareId, ChunkId)).AsObject()["size"sv].AsUInt64() == Size); + + { + IoBuffer Payload = Client.Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId)).ResponsePayload; + CHECK(Payload); + CHECK(Payload.GetSize() == Size); + IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path); + CHECK(FileContent); + CHECK(FileContent.GetView().EqualBytes(Payload.GetView())); + } + + { + IoBuffer Payload = + Client + .Get(fmt::format("/ws/{}/{}/{}", WorkspaceId, ShareId, ChunkId), + {}, + HttpClient::KeyValueMap{{"offset", fmt::format("{}", Size / 4)}, {"size", fmt::format("{}", Size / 2)}}) + .ResponsePayload; + CHECK(Payload); + CHECK(Payload.GetSize() == Size / 2); + IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Size / 4, Size / 2); + CHECK(FileContent); + CHECK(FileContent.GetView().EqualBytes(Payload.GetView())); + } + } + + { + uint32_t CorrelationId = gsl::narrow<uint32_t>(Files.size()); + std::vector<RequestChunkEntry> BatchEntries; + for (auto It : Files) + { + const Oid& ChunkId = It.first; + const uint64_t Size = It.second.second; + + BatchEntries.push_back( + RequestChunkEntry{.ChunkId = ChunkId, .CorrelationId = --CorrelationId, .Offset = Size / 4, .RequestBytes = Size / 2}); + } + IoBuffer BatchResponse = + Client.Post(fmt::format("/ws/{}/{}/batch", WorkspaceId, ShareId), BuildChunkBatchRequest(BatchEntries)).ResponsePayload; + CHECK(BatchResponse); + std::vector<IoBuffer> BatchResult = ParseChunkBatchResponse(BatchResponse); + CHECK(BatchResult.size() == Files.size()); + for (const RequestChunkEntry& Request : BatchEntries) + { + IoBuffer Result = BatchResult[Request.CorrelationId]; + auto It = Files.find(Request.ChunkId); + const std::filesystem::path& Path = It->second.first; + CHECK(Result.GetSize() == Request.RequestBytes); + IoBuffer FileContent = IoBufferBuilder::MakeFromFile(Path, Request.Offset, Request.RequestBytes); + CHECK(FileContent); + CHECK(FileContent.GetView().EqualBytes(Result.GetView())); + } + } + + CHECK(Client.Delete(fmt::format("/ws/{}/{}", WorkspaceId, ShareId))); + CHECK(Client.Get(fmt::format("/ws/{}/{}", WorkspaceId, ShareId)).StatusCode == HttpResponseCode::NotFound); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId))); + + CHECK(Client.Delete(fmt::format("/ws/{}", WorkspaceId))); + CHECK(Client.Get(fmt::format("/ws/{}", WorkspaceId)).StatusCode == HttpResponseCode::NotFound); +} + # if 0 TEST_CASE("lifetime.owner") { diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index ce1b21926..56d14b4a9 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -528,6 +528,9 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.Parse(Path, CmdLineResult); + ////// workspaces + LuaOptions.AddOption("workspaces.enabled"sv, ServerOptions.WorksSpacesConfig.Enabled, "workspaces-enabled"sv); + // These have special command line processing so we make sure we export them if they were configured on command line if (!ServerOptions.AuthConfig.OpenIdProviders.empty()) { @@ -1001,6 +1004,13 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<bool>(ServerOptions.StatsConfig.Enabled)->default_value("false"), "Enable statsd reporter (localhost:8125)"); + options.add_option("stats", + "", + "workspaces-enabled", + "", + cxxopts::value<bool>(ServerOptions.WorksSpacesConfig.Enabled)->default_value("false"), + "Enable workspaces support with folder sharing"); + try { cxxopts::ParseResult Result; diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 1e44d54c0..fec871a0e 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -118,6 +118,11 @@ struct ZenStructuredCacheConfig uint64_t MemMaxAgeSeconds = gsl::narrow<uint64_t>(std::chrono::seconds(std::chrono::days(1)).count()); }; +struct ZenWorkspacesConfig +{ + bool Enabled = false; +}; + struct ZenServerOptions { ZenUpstreamCacheConfig UpstreamCacheConfig; @@ -127,6 +132,7 @@ struct ZenServerOptions zen::HttpServerConfig HttpServerConfig; ZenStructuredCacheConfig StructuredCacheConfig; ZenStatsConfig StatsConfig; + ZenWorkspacesConfig WorksSpacesConfig; std::filesystem::path SystemRootDir; // System root directory (used for machine level config) std::filesystem::path DataDir; // Root directory for state (used for testing) std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) diff --git a/src/zenserver/workspaces/httpworkspaces.cpp b/src/zenserver/workspaces/httpworkspaces.cpp new file mode 100644 index 000000000..85403aa78 --- /dev/null +++ b/src/zenserver/workspaces/httpworkspaces.cpp @@ -0,0 +1,802 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <workspaces/httpworkspaces.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/trace.h> +#include <zenstore/workspaces.h> +#include <zenutil/basicfile.h> +#include <zenutil/chunkrequests.h> +#include <zenutil/workerpools.h> + +#include <unordered_set> + +namespace zen { +using namespace std::literals; + +ZEN_DEFINE_LOG_CATEGORY_STATIC(LogObj, "fs"sv); + +namespace { + + std::filesystem::path GetPathParameter(HttpServerRequest& ServerRequest, std::string_view Name) + { + if (std::string_view Value = ServerRequest.GetQueryParams().GetValue(Name); !Value.empty()) + { + return std::filesystem::path(HttpServerRequest::Decode(Value)); + } + return {}; + } + + Oid PathToChunkId(const std::filesystem::path& Path) + { + const std::string PathBuffer = reinterpret_cast<const char*>(Path.generic_u8string().c_str()); + BLAKE3 Hash = BLAKE3::HashMemory(PathBuffer.data(), PathBuffer.size()); + Hash.Hash[11] = 7; // FIoChunkType::ExternalFile + return Oid::FromMemory(Hash.Hash); + } + +} // namespace + +HttpWorkspacesService::HttpWorkspacesService(HttpStatsService& StatsService, const FileServeConfig& Cfg, Workspaces& Workspaces) +: m_Log(logging::Get("workspaces")) +, m_StatsService(StatsService) +, m_Config(Cfg) +, m_Workspaces(Workspaces) +{ + Initialize(); +} + +HttpWorkspacesService::~HttpWorkspacesService() +{ + m_StatsService.UnregisterHandler("prj", *this); +} + +const char* +HttpWorkspacesService::BaseUri() const +{ + return "/ws/"; +} + +void +HttpWorkspacesService::HandleRequest(HttpServerRequest& Request) +{ + metrics::OperationTiming::Scope $(m_HttpRequests); + + if (m_Router.HandleRequest(Request) == false) + { + ZEN_LOG_WARN(LogObj, "No route found for {0}", Request.RelativeUri()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); + } +} + +void +HttpWorkspacesService::HandleStatsRequest(HttpServerRequest& HttpReq) +{ + ZEN_TRACE_CPU("WorkspacesService::Stats"); + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Cbo.BeginObject("workspaces"); + { + Cbo.BeginObject("workspace"); + { + Cbo << "readcount" << m_WorkspacesStats.WorkspaceReadCount << "writecount" << m_WorkspacesStats.WorkspaceWriteCount + << "deletecount" << m_WorkspacesStats.WorkspaceDeleteCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("workspaceshare"); + { + Cbo << "readcount" << m_WorkspacesStats.WorkspaceShareReadCount << "writecount" << m_WorkspacesStats.WorkspaceShareWriteCount + << "deletecount" << m_WorkspacesStats.WorkspaceShareDeleteCount; + } + Cbo.EndObject(); + + Cbo.BeginObject("chunk"); + { + Cbo << "hitcount" << m_WorkspacesStats.WorkspaceShareChunkHitCount << "misscount" + << m_WorkspacesStats.WorkspaceShareChunkMissCount; + } + Cbo.EndObject(); + + Cbo << "filescount" << m_WorkspacesStats.WorkspaceShareFilesReadCount; + Cbo << "entriescount" << m_WorkspacesStats.WorkspaceShareEntriesReadCount; + Cbo << "batchcount" << m_WorkspacesStats.WorkspaceShareBatchReadCount; + + Cbo << "requestcount" << m_WorkspacesStats.RequestCount; + Cbo << "badrequestcount" << m_WorkspacesStats.BadRequestCount; + } + Cbo.EndObject(); + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpWorkspacesService::Initialize() +{ + using namespace std::literals; + + ZEN_LOG_INFO(LogObj, "Initialzing Workspaces Service"); + + m_StatsService.RegisterHandler("ws", *this); + + m_Router.AddPattern("workspace", "([[:xdigit:]]{24})"); + m_Router.AddPattern("share_id", "([[:xdigit:]]{24})"); + m_Router.AddPattern("chunk", "([[:xdigit:]]{24})"); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}/files", + [this](HttpRouterRequest& Req) { FilesRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}/{chunk}/info", + [this](HttpRouterRequest& Req) { ChunkInfoRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}/batch", + [this](HttpRouterRequest& Req) { BatchRequest(Req); }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}/entries", + [this](HttpRouterRequest& Req) { EntriesRequest(Req); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}/{chunk}", + [this](HttpRouterRequest& Req) { ChunkRequest(Req); }, + HttpVerb::kGet | HttpVerb::kHead); + + m_Router.RegisterRoute( + "{workspace_id}/{share_id}", + [this](HttpRouterRequest& Req) { ShareRequest(Req); }, + HttpVerb::kPut | HttpVerb::kGet | HttpVerb::kDelete); + + m_Router.RegisterRoute( + "{workspace_id}", + [this](HttpRouterRequest& Req) { WorkspaceRequest(Req); }, + HttpVerb::kPut | HttpVerb::kGet | HttpVerb::kDelete); + + ReadState(); +} + +std::filesystem::path +HttpWorkspacesService::GetStatePath() const +{ + return m_Config.SystemRootDir / "workspaces"; +} + +void +HttpWorkspacesService::ReadState() +{ + if (!m_Config.SystemRootDir.empty()) + { + m_Workspaces.ReadState(GetStatePath(), [](const std::filesystem::path& Path) { return PathToChunkId(Path); }); + } +} + +void +HttpWorkspacesService::WriteState() +{ + if (!m_Config.SystemRootDir.empty()) + { + m_Workspaces.WriteState(GetStatePath()); + } +} + +void +HttpWorkspacesService::FilesRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + const Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + + m_WorkspacesStats.WorkspaceShareFilesReadCount++; + + std::unordered_set<std::string> WantedFieldNames; + if (auto FieldFilter = HttpServerRequest::Decode(ServerRequest.GetQueryParams().GetValue("fieldnames")); !FieldFilter.empty()) + { + if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields + { + ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { + WantedFieldNames.insert(std::string(FieldName)); + return true; + }); + } + } + else + { + const bool FilterClient = ServerRequest.GetQueryParams().GetValue("filter"sv) == "client"sv; + WantedFieldNames.insert("id"); + WantedFieldNames.insert("clientpath"); + if (!FilterClient) + { + WantedFieldNames.insert("serverpath"); + } + } + + bool Refresh = false; + if (auto RefreshStr = ServerRequest.GetQueryParams().GetValue("refresh"); !RefreshStr.empty()) + { + Refresh = StrCaseCompare(std::string(RefreshStr).c_str(), "true") == 0; + } + + const bool WantsAllFields = WantedFieldNames.empty(); + + const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); + const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath"); + const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath"); + const bool WantsRawSizeField = WantsAllFields || WantedFieldNames.contains("rawsize"); + const bool WantsSizeField = WantsAllFields || WantedFieldNames.contains("size"); + + std::optional<std::vector<Workspaces::ShareFile>> Files = + m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool()); + if (!Files.has_value()) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + + CbObjectWriter Response; + Response.BeginArray("files"sv); + { + for (const Workspaces::ShareFile& Entry : Files.value()) + { + Response.BeginObject(); + if (WantsIdField) + { + Response << "id"sv << Entry.Id; + } + if (WantsServerPathField) + { + Response << "serverpath"sv << Entry.RelativePath; + } + if (WantsClientPathField) + { + Response << "clientpath"sv << Entry.RelativePath; + } + if (WantsSizeField) + { + Response << "size"sv << Entry.Size; + } + if (WantsRawSizeField) + { + Response << "rawsize"sv << Entry.Size; + } + Response.EndObject(); + } + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); +} + +void +HttpWorkspacesService::ChunkInfoRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + const Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + const Oid ChunkId = Oid::TryFromHexString(Req.GetCapture(3)); + if (ChunkId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid chunk id '{}'", Req.GetCapture(3))); + } + Workspaces::ShareFile File = m_Workspaces.GetWorkspaceShareChunkInfo(WorkspaceId, ShareId, ChunkId, GetSmallWorkerPool()); + if (File.Id != Oid::Zero) + { + CbObjectWriter Response; + Response << "size"sv << File.Size; + m_WorkspacesStats.WorkspaceShareChunkHitCount++; + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + m_WorkspacesStats.WorkspaceShareChunkMissCount++; + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); +} + +void +HttpWorkspacesService::BatchRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + const Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + IoBuffer Payload = ServerRequest.ReadPayload(); + std::optional<std::vector<RequestChunkEntry>> ChunkRequests = ParseChunkBatchRequest(Payload); + if (!ChunkRequests.has_value()) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "batch payload malformed"); + } + m_WorkspacesStats.WorkspaceShareBatchReadCount++; + std::vector<Workspaces::ChunkRequest> Requests; + Requests.reserve(ChunkRequests.value().size()); + std::transform(ChunkRequests.value().begin(), + ChunkRequests.value().end(), + std::back_inserter(Requests), + [](const RequestChunkEntry& Entry) { + return Workspaces::ChunkRequest{.ChunkId = Entry.ChunkId, .Offset = Entry.Offset, .Size = Entry.RequestBytes}; + }); + std::vector<IoBuffer> Chunks = m_Workspaces.GetWorkspaceShareChunks(WorkspaceId, ShareId, Requests, GetSmallWorkerPool()); + if (Chunks.empty()) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + for (const IoBuffer& Buffer : Chunks) + { + if (Buffer) + { + m_WorkspacesStats.WorkspaceShareChunkHitCount++; + } + else + { + m_WorkspacesStats.WorkspaceShareChunkMissCount++; + } + } + std::vector<IoBuffer> Response = BuildChunkBatchResponse(ChunkRequests.value(), Chunks); + if (!Response.empty()) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Response); + } + return ServerRequest.WriteResponse(HttpResponseCode::InternalServerError, + HttpContentType::kText, + fmt::format("failed formatting response for batch of {} chunks", Chunks.size())); +} + +void +HttpWorkspacesService::EntriesRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + std::string_view OpKey = ServerRequest.GetQueryParams().GetValue("opkey"sv); + if (!OpKey.empty() && OpKey != "file_manifest") + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + const Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + std::unordered_set<std::string> WantedFieldNames; + if (auto FieldFilter = HttpServerRequest::Decode(ServerRequest.GetQueryParams().GetValue("fieldfilter")); !FieldFilter.empty()) + { + if (FieldFilter != "*") // Get all - empty FieldFilter equal getting all fields + { + ForEachStrTok(FieldFilter, ',', [&](std::string_view FieldName) { + WantedFieldNames.insert(std::string(FieldName)); + return true; + }); + } + } + + bool Refresh = false; + if (auto RefreshStr = ServerRequest.GetQueryParams().GetValue("refresh"); !RefreshStr.empty()) + { + Refresh = StrCaseCompare(std::string(RefreshStr).c_str(), "true") == 0; + } + + m_WorkspacesStats.WorkspaceShareEntriesReadCount++; + std::optional<std::vector<Workspaces::ShareFile>> Files = + m_Workspaces.GetWorkspaceShareFiles(WorkspaceId, ShareId, Refresh, GetSmallWorkerPool()); + if (!Files.has_value()) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + const bool WantsAllFields = WantedFieldNames.empty(); + + const bool WantsIdField = WantsAllFields || WantedFieldNames.contains("id"); + const bool WantsClientPathField = WantsAllFields || WantedFieldNames.contains("clientpath"); + const bool WantsServerPathField = WantsAllFields || WantedFieldNames.contains("serverpath"); + + CbObjectWriter Response; + + if (OpKey.empty()) + { + Response.BeginArray("entries"sv); + Response.BeginObject(); + } + else + { + Response.BeginObject("entry"sv); + } + { + // Synthesize a fake op + Response << "key" + << "file_manifest"; + + Response.BeginArray("files"); + { + for (const Workspaces::ShareFile& Entry : Files.value()) + { + Response.BeginObject(); + { + if (WantsIdField) + { + Response << "id"sv << Entry.Id; + } + if (WantsServerPathField) + { + Response << "serverpath"sv << Entry.RelativePath; + } + if (WantsClientPathField) + { + Response << "clientpath"sv << Entry.RelativePath; + } + } + Response.EndObject(); + } + } + Response.EndArray(); + } + + if (OpKey.empty()) + { + Response.EndObject(); + Response.EndArray(); + } + else + { + Response.EndObject(); + } + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); +} + +void +HttpWorkspacesService::ChunkRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + const Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + const Oid ChunkId = Oid::TryFromHexString(Req.GetCapture(3)); + if (ChunkId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid chunk id '{}'", Req.GetCapture(3))); + } + + uint64_t Offset = 0; + uint64_t Size = ~(0ull); + if (auto OffsetParm = ServerRequest.GetQueryParams().GetValue("offset"); OffsetParm.empty() == false) + { + if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm)) + { + Offset = OffsetVal.value(); + } + else + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid offset parameter '{}'", OffsetParm)); + } + } + + if (auto SizeParm = ServerRequest.GetQueryParams().GetValue("size"); SizeParm.empty() == false) + { + if (auto SizeVal = ParseInt<uint64_t>(SizeParm)) + { + Size = SizeVal.value(); + } + else + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid size parameter '{}'", SizeParm)); + } + } + + std::vector<IoBuffer> Response = m_Workspaces.GetWorkspaceShareChunks( + WorkspaceId, + ShareId, + std::vector<Workspaces::ChunkRequest>{Workspaces::ChunkRequest{.ChunkId = ChunkId, .Offset = Offset, .Size = Size}}, + GetSmallWorkerPool()); + if (!Response.empty() && Response[0]) + { + m_WorkspacesStats.WorkspaceShareChunkHitCount++; + if (Response[0].GetSize() == 0) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response[0].GetContentType(), Response); + } + m_WorkspacesStats.WorkspaceShareChunkMissCount++; + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); +} + +void +HttpWorkspacesService::ShareRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + const Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + Oid ShareId = Oid::TryFromHexString(Req.GetCapture(2)); + switch (ServerRequest.RequestVerb()) + { + case HttpVerb::kPut: + { + std::filesystem::path SharePath = GetPathParameter(ServerRequest, "share_path"sv); + if (SharePath.empty()) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Invalid 'share_path' parameter"); + } + if (Req.GetCapture(2) == Oid::Zero.ToString()) + { + // Synthesize Id + ShareId = PathToChunkId(SharePath); + ZEN_INFO("Generated workspace id from path '{}': {}", SharePath, ShareId); + } + else if (ShareId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid share id '{}'", Req.GetCapture(2))); + } + m_WorkspacesStats.WorkspaceShareWriteCount++; + if (m_Workspaces.GetWorkspaceInfo(WorkspaceId).Config.Id != WorkspaceId) + { + return ServerRequest.WriteResponse(HttpResponseCode::NotFound, + HttpContentType::kText, + fmt::format("Workspace '{}' does not exist", WorkspaceId)); + } + bool OK = m_Workspaces.AddWorkspaceShare(WorkspaceId, {ShareId, SharePath}, [](const std::filesystem::path& Path) { + return PathToChunkId(Path); + }); + if (OK) + { + WriteState(); + return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", ShareId)); + } + else + { + Workspaces::WorkspaceShareConfiguration Config = m_Workspaces.GetWorkspaceShareConfiguration(WorkspaceId, ShareId); + if (Config.Id == ShareId) + { + if (Config.SharePath == SharePath) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("{}", ShareId)); + } + } + return ServerRequest.WriteResponse( + HttpResponseCode::Conflict, + HttpContentType::kText, + fmt::format("Workspace share '{}' already exist in workspace '{}'", ShareId, WorkspaceId)); + } + } + case HttpVerb::kGet: + { + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + m_WorkspacesStats.WorkspaceShareReadCount++; + Workspaces::WorkspaceShareConfiguration Config = m_Workspaces.GetWorkspaceShareConfiguration(WorkspaceId, ShareId); + if (Config.Id != Oid::Zero) + { + CbObjectWriter Response; + Response << "id" << Config.Id; + Response << "share_path" << Config.SharePath.string(); // utf8? + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + case HttpVerb::kDelete: + { + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + m_WorkspacesStats.WorkspaceShareDeleteCount++; + bool Deleted = m_Workspaces.RemoveWorkspaceShare(WorkspaceId, ShareId); + if (Deleted) + { + WriteState(); + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + } +} + +void +HttpWorkspacesService::WorkspaceRequest(HttpRouterRequest& Req) +{ + HttpServerRequest& ServerRequest = Req.ServerRequest(); + Oid WorkspaceId = Oid::TryFromHexString(Req.GetCapture(1)); + switch (ServerRequest.RequestVerb()) + { + case HttpVerb::kPut: + { + std::filesystem::path WorkspacePath = GetPathParameter(ServerRequest, "root_path"sv); + if (WorkspacePath.empty()) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Invalid 'root_path' parameter"); + } + if (Req.GetCapture(1) == Oid::Zero.ToString()) + { + // Synthesize Id + WorkspaceId = PathToChunkId(WorkspacePath); + ZEN_INFO("Generated workspace id from path '{}': {}", WorkspacePath, WorkspaceId); + } + else if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + m_WorkspacesStats.WorkspaceWriteCount++; + bool OK = m_Workspaces.AddWorkspace({WorkspaceId, WorkspacePath}); + if (OK) + { + WriteState(); + return ServerRequest.WriteResponse(HttpResponseCode::Created, HttpContentType::kText, fmt::format("{}", WorkspaceId)); + } + else + { + Workspaces::WorkspaceInfo Info = m_Workspaces.GetWorkspaceInfo(WorkspaceId); + if (Info.Config.Id == WorkspaceId) + { + if (Info.Config.RootPath == WorkspacePath) + { + return ServerRequest.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + fmt::format("{}", WorkspaceId)); + } + } + return ServerRequest.WriteResponse( + HttpResponseCode::Conflict, + HttpContentType::kText, + fmt::format("Workspace {} already exists with root path '{}'", WorkspaceId, Info.Config.RootPath)); + } + } + case HttpVerb::kGet: + { + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + m_WorkspacesStats.WorkspaceReadCount++; + Workspaces::WorkspaceInfo Info = m_Workspaces.GetWorkspaceInfo(WorkspaceId); + if (Info.Config.Id != Oid::Zero) + { + CbObjectWriter Response; + Response << "id" << Info.Config.Id; + Response << "root_path" << Info.Config.RootPath.string(); // utf8? + Response.BeginArray("shares"); + for (const Workspaces::WorkspaceShareConfiguration& ShareConfig : Info.Shares) + { + Response.BeginObject(); + { + Response << "id" << ShareConfig.Id; + Response << "share_path" << ShareConfig.SharePath.string(); // utf8? + } + Response.EndObject(); + } + Response.EndArray(); + + return ServerRequest.WriteResponse(HttpResponseCode::OK, Response.Save()); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + case HttpVerb::kDelete: + { + if (WorkspaceId == Oid::Zero) + { + m_WorkspacesStats.BadRequestCount++; + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Invalid workspace id '{}'", Req.GetCapture(1))); + } + m_WorkspacesStats.WorkspaceDeleteCount++; + bool Deleted = m_Workspaces.RemoveWorkspace(WorkspaceId); + if (Deleted) + { + WriteState(); + return ServerRequest.WriteResponse(HttpResponseCode::OK); + } + return ServerRequest.WriteResponse(HttpResponseCode::NotFound); + } + } +} + +} // namespace zen diff --git a/src/zenserver/workspaces/httpworkspaces.h b/src/zenserver/workspaces/httpworkspaces.h new file mode 100644 index 000000000..cfd23e7ba --- /dev/null +++ b/src/zenserver/workspaces/httpworkspaces.h @@ -0,0 +1,72 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/stats.h> +#include <zenhttp/httpserver.h> +#include <zenhttp/httpstats.h> + +namespace zen { + +class Workspaces; + +struct FileServeConfig +{ + std::filesystem::path SystemRootDir; +}; + +class HttpWorkspacesService final : public HttpService, public IHttpStatsProvider +{ +public: + HttpWorkspacesService(HttpStatsService& StatsService, const FileServeConfig& Cfg, Workspaces& Workspaces); + virtual ~HttpWorkspacesService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + +private: + struct WorkspacesStats + { + std::atomic_uint64_t WorkspaceReadCount{}; + std::atomic_uint64_t WorkspaceWriteCount{}; + std::atomic_uint64_t WorkspaceDeleteCount{}; + std::atomic_uint64_t WorkspaceShareReadCount{}; + std::atomic_uint64_t WorkspaceShareWriteCount{}; + std::atomic_uint64_t WorkspaceShareDeleteCount{}; + std::atomic_uint64_t WorkspaceShareFilesReadCount{}; + std::atomic_uint64_t WorkspaceShareEntriesReadCount{}; + std::atomic_uint64_t WorkspaceShareBatchReadCount{}; + std::atomic_uint64_t WorkspaceShareChunkHitCount{}; + std::atomic_uint64_t WorkspaceShareChunkMissCount{}; + std::atomic_uint64_t RequestCount{}; + std::atomic_uint64_t BadRequestCount{}; + }; + + inline LoggerRef Log() { return m_Log; } + + LoggerRef m_Log; + + void Initialize(); + std::filesystem::path GetStatePath() const; + void ReadState(); + void WriteState(); + + void FilesRequest(HttpRouterRequest& Req); + void ChunkInfoRequest(HttpRouterRequest& Req); + void BatchRequest(HttpRouterRequest& Req); + void EntriesRequest(HttpRouterRequest& Req); + void ChunkRequest(HttpRouterRequest& Req); + void ShareRequest(HttpRouterRequest& Req); + void WorkspaceRequest(HttpRouterRequest& Req); + + HttpStatsService& m_StatsService; + const FileServeConfig m_Config; + HttpRequestRouter m_Router; + Workspaces& m_Workspaces; + WorkspacesStats m_WorkspacesStats; + metrics::OperationTiming m_HttpRequests; +}; + +} // namespace zen diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 0909c26e9..9f24960bd 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -23,6 +23,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> +#include <zenstore/workspaces.h> #include <zenutil/basicfile.h> #include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> @@ -226,6 +227,13 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue); m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr}); + if (ServerOptions.WorksSpacesConfig.Enabled) + { + m_Workspaces.reset(new Workspaces()); + m_HttpWorkspacesService.reset( + new HttpWorkspacesService(m_StatsService, {.SystemRootDir = ServerOptions.SystemRootDir}, *m_Workspaces)); + } + if (ServerOptions.StructuredCacheConfig.Enabled) { InitializeStructuredCache(ServerOptions); @@ -246,6 +254,11 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_Http->RegisterService(*m_HttpProjectService); } + if (m_HttpWorkspacesService) + { + m_Http->RegisterService(*m_HttpWorkspacesService); + } + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); if (m_FrontendService) @@ -761,6 +774,8 @@ ZenServer::Cleanup() m_UpstreamCache.reset(); m_CacheStore = {}; + m_HttpWorkspacesService.reset(); + m_Workspaces.reset(); m_HttpProjectService.reset(); m_ProjectStore = {}; m_CidStore.reset(); diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h index 0bab4e0a7..b9d12689d 100644 --- a/src/zenserver/zenserver.h +++ b/src/zenserver/zenserver.h @@ -34,6 +34,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "stats/statsreporter.h" #include "upstream/upstream.h" #include "vfs/vfsservice.h" +#include "workspaces/httpworkspaces.h" #ifndef ZEN_APP_NAME # define ZEN_APP_NAME "Unreal Zen Storage Server" @@ -55,7 +56,6 @@ public: int Initialize(const ZenServerOptions& ServerOptions, ZenServerState::ZenServerEntry* ServerEntry); void InitializeState(const ZenServerOptions& ServerOptions); void InitializeStructuredCache(const ZenServerOptions& ServerOptions); - void InitializeCompute(const ZenServerOptions& ServerOptions); void Run(); void RequestExit(int ExitCode); @@ -131,6 +131,8 @@ private: #endif RefPtr<ProjectStore> m_ProjectStore; std::unique_ptr<HttpProjectService> m_HttpProjectService; + std::unique_ptr<Workspaces> m_Workspaces; + std::unique_ptr<HttpWorkspacesService> m_HttpWorkspacesService; std::unique_ptr<UpstreamCache> m_UpstreamCache; std::unique_ptr<HttpUpstreamService> m_UpstreamService; std::unique_ptr<HttpStructuredCacheService> m_StructuredCacheService; diff --git a/src/zenstore/include/zenstore/workspaces.h b/src/zenstore/include/zenstore/workspaces.h new file mode 100644 index 000000000..e1a024894 --- /dev/null +++ b/src/zenstore/include/zenstore/workspaces.h @@ -0,0 +1,102 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> +#include <zencore/filesystem.h> +#include <zencore/logbase.h> +#include <zencore/uid.h> +#include <zencore/zencore.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_map.h> +ZEN_THIRD_PARTY_INCLUDES_END + +#include <optional> + +namespace zen { + +class WorkerThreadPool; +class Workspace; +class WorkspaceShare; + +class Workspaces +{ +public: + struct ChunkRequest + { + Oid ChunkId; + uint64_t Offset = 0; + uint64_t Size = ~uint64_t(0); + }; + + struct ShareFile + { + std::string RelativePath; + uint64_t Size; + Oid Id; + }; + + struct WorkspaceConfiguration + { + Oid Id; + std::filesystem::path RootPath; + }; + + struct WorkspaceShareConfiguration + { + Oid Id; + std::filesystem::path SharePath; + }; + + struct WorkspaceInfo + { + WorkspaceConfiguration Config; + std::vector<WorkspaceShareConfiguration> Shares; + }; + + Workspaces(); + ~Workspaces(); + + bool AddWorkspace(const WorkspaceConfiguration& Configuration); + WorkspaceConfiguration GetWorkspaceConfiguration(const Oid& WorkspaceId) const; + WorkspaceInfo GetWorkspaceInfo(const Oid& WorkspaceId) const; + bool RemoveWorkspace(const Oid& WorkspaceId); + + bool AddWorkspaceShare(const Oid& WorkspaceId, + const WorkspaceShareConfiguration& Configuration, + std::function<Oid(const std::filesystem::path& Path)>&& PathToIdCB); + WorkspaceShareConfiguration GetWorkspaceShareConfiguration(const Oid& WorkspaceId, const Oid& ShareId) const; + bool RemoveWorkspaceShare(const Oid& WorkspaceId, const Oid& ShareId); + + std::optional<std::vector<ShareFile>> GetWorkspaceShareFiles(const Oid& WorkspaceId, + const Oid& ShareId, + bool ForceRefresh, + WorkerThreadPool& WorkerPool); + + ShareFile GetWorkspaceShareChunkInfo(const Oid& WorkspaceId, const Oid& ShareId, const Oid& ChunkId, WorkerThreadPool& WorkerPool); + + std::vector<IoBuffer> GetWorkspaceShareChunks(const Oid& WorkspaceId, + const Oid& ShareId, + const std::span<const ChunkRequest> ChunkRequests, + WorkerThreadPool& WorkerPool); + + void WriteState(const std::filesystem::path& WorkspaceStatePath); + void ReadState(const std::filesystem::path& WorkspaceStatePath, std::function<Oid(const std::filesystem::path& Path)>&& PathToIdCB); + +private: + LoggerRef& Log() { return m_Log; } + + Ref<Workspace> FindWorkspace(const RwLock::SharedLockScope& Lock, const Oid& WorkspaceId) const; + std::pair<Ref<Workspace>, Ref<WorkspaceShare>> FindWorkspaceShare(const Oid& WorkspaceId, + const Oid& ShareId, + bool ForceRefresh, + WorkerThreadPool& WorkerPool); + LoggerRef m_Log; + mutable RwLock m_Lock; + tsl::robin_map<Oid, Ref<Workspace>, Oid::Hasher> m_Workspaces; +}; + +void workspaces_forcelink(); + +} // namespace zen diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp new file mode 100644 index 000000000..958d7b3f5 --- /dev/null +++ b/src/zenstore/workspaces.cpp @@ -0,0 +1,955 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstore/workspaces.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenutil/basicfile.h> + +#if ZEN_WITH_TESTS +# include <zencore/blake3.h> +# include <zencore/testing.h> +# include <zencore/testutils.h> +#endif + +namespace zen { + +namespace { + std::string WorkspaceShareToJson(const Workspaces::WorkspaceShareConfiguration& ShareConfig) + { + using namespace std::literals; + + CbObjectWriter ShareWriter; + ShareWriter.AddObjectId("id"sv, ShareConfig.Id); + ShareWriter.AddString("share_path"sv, reinterpret_cast<const char*>(ShareConfig.SharePath.u8string().c_str())); + ExtendableStringBuilder<256> Json; + ShareWriter.Save().ToJson(Json); + return Json.ToString(); + } + + Workspaces::WorkspaceShareConfiguration WorkspaceShareFromJson(const IoBuffer& ShareJson, std::string& OutError) + { + using namespace std::literals; + + CbFieldIterator StateField = + LoadCompactBinaryFromJson(std::string_view((const char*)(ShareJson.Data()), ShareJson.GetSize()), OutError); + if (OutError.empty()) + { + if (CbObjectView Object = StateField.AsObjectView(); Object) + { + Oid ShareId = Object["id"sv].AsObjectId(); + std::filesystem::path SharePath = Object["share_path"sv].AsU8String(); + if (ShareId != Oid::Zero && !SharePath.empty()) + { + return {.Id = ShareId, .SharePath = SharePath}; + } + } + } + return {}; + } + + std::string WorkspaceToJson(const Workspaces::WorkspaceConfiguration& WorkspaceConfig) + { + using namespace std::literals; + + CbObjectWriter ShareWriter; + ShareWriter.AddObjectId("id"sv, WorkspaceConfig.Id); + ShareWriter.AddString("root_path"sv, reinterpret_cast<const char*>(WorkspaceConfig.RootPath.u8string().c_str())); + ExtendableStringBuilder<256> Json; + ShareWriter.Save().ToJson(Json); + return Json.ToString(); + } + + Workspaces::WorkspaceConfiguration WorkspaceFromJson(const IoBuffer& WorkspaceJson, std::string& OutError) + { + using namespace std::literals; + + CbFieldIterator StateField = + LoadCompactBinaryFromJson(std::string_view((const char*)(WorkspaceJson.Data()), WorkspaceJson.GetSize()), OutError); + if (OutError.empty()) + { + if (CbObjectView Object = StateField.AsObjectView(); Object) + { + Oid WorkspaceId = Object["id"sv].AsObjectId(); + std::filesystem::path RootPath = Object["root_path"sv].AsU8String(); + if (WorkspaceId != Oid::Zero && !RootPath.empty()) + { + return {.Id = WorkspaceId, .RootPath = RootPath}; + } + } + } + return {}; + } + +} // namespace +////////////////////////////////////////////////////////////////////////// + +class FolderStructure +{ +public: + struct FileEntry + { + std::filesystem::path RelativePath; + uint64_t Size; + }; + + FolderStructure() {} + FolderStructure(std::vector<FileEntry>&& InEntries, std::vector<Oid>&& Ids); + + const FileEntry* FindEntry(const Oid& Id) const + { + if (auto It = IdLookup.find(Id); It != IdLookup.end()) + { + return &Entries[It->second]; + } + return nullptr; + } + + size_t EntryCount() const { return Entries.size(); } + + void IterateEntries(std::function<void(const Oid& Id, const FileEntry& Entry)>&& Callback) const + { + for (auto It = IdLookup.begin(); It != IdLookup.end(); It++) + { + Callback(It->first, Entries[It->second]); + } + } + +private: + const std::vector<FileEntry> Entries; + tsl::robin_map<Oid, size_t, Oid::Hasher> IdLookup; +}; + +////////////////////////////////////////////////////////////////////////// + +class WorkspaceShare : public RefCounted +{ +public: + WorkspaceShare(const Workspaces::WorkspaceShareConfiguration& Config, + std::unique_ptr<FolderStructure>&& FolderStructure, + std::function<Oid(const std::filesystem::path& Path)>&& PathToId); + + const Workspaces::WorkspaceShareConfiguration& GetConfig() const; + + bool IsInitialized() const { return !!m_FolderStructure; } + + const std::function<Oid(const std::filesystem::path& Path)>& GetPathToIdFunction() const { return m_PathToid; } + + std::filesystem::path GetAbsolutePath(const std::filesystem::path& RootPath, const Oid& ChunkId, uint64_t& OutSize) const; + + const FolderStructure& GetStructure() const + { + ZEN_ASSERT(m_FolderStructure); + return *m_FolderStructure; + } + +private: + const Workspaces::WorkspaceShareConfiguration m_Config; + std::function<Oid(const std::filesystem::path& Path)> m_PathToid; + std::unique_ptr<FolderStructure> m_FolderStructure; +}; + +////////////////////////////////////////////////////////////////////////// + +class Workspace : public RefCounted +{ +public: + Workspace(LoggerRef& Log, const Workspaces::WorkspaceConfiguration& Config); + + const Workspaces::WorkspaceConfiguration& GetConfig() const; + std::vector<Ref<WorkspaceShare>> GetShares() const; + Ref<WorkspaceShare> GetShare(const Oid& ShareId) const; + + void SetShare(const Oid& ShareId, Ref<WorkspaceShare>&& Share); + +private: + LoggerRef Log() { return m_Log; } + + LoggerRef& m_Log; + const Workspaces::WorkspaceConfiguration m_Config; + tsl::robin_map<Oid, Ref<WorkspaceShare>, Oid::Hasher> m_Shares; +}; + +////////////////////////////////////////////////////////////////////////// + +FolderStructure::FolderStructure(std::vector<FileEntry>&& InEntries, std::vector<Oid>&& Ids) : Entries(std::move(InEntries)) +{ + IdLookup.reserve(Entries.size()); + for (size_t Index = 0; Index < Entries.size(); Index++) + { + Oid Id = Ids[Index]; + IdLookup.insert(std::make_pair(Id, Index)); + } +} + +namespace { + struct FolderScanner + { + FolderScanner(LoggerRef& Log, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& Path, + const std::function<Oid(const std::filesystem::path& Path)>& PathToIdCB) + : m_Log(Log) + , Path(Path) + , PathToIdCB(PathToIdCB) + , WorkLatch(1) + , WorkerPool(WorkerPool) + { + } + + void Traverse(); + void Traverse(const std::filesystem::path& RelativeRoot, const std::filesystem::path& Path); + + LoggerRef& Log() { return m_Log; } + LoggerRef& m_Log; + const std::filesystem::path Path; + RwLock WorkLock; + const std::function<Oid(const std::filesystem::path& Path)>& PathToIdCB; + std::vector<FolderStructure::FileEntry> FoundFiles; + std::vector<Oid> FoundFileIds; + Latch WorkLatch; + WorkerThreadPool& WorkerPool; + }; + + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(FolderScanner& Data, const std::filesystem::path& RelativeRoot) : Data(Data), RelativeRoot(RelativeRoot) {} + + FileSystemTraversal Traverser; + FolderScanner& Data; + std::vector<FolderStructure::FileEntry> Entries; + std::vector<Oid> FileIds; + std::filesystem::path RelativeRoot; + + virtual void VisitFile(const std::filesystem::path&, const path_view& File, uint64_t FileSize) + { + std::filesystem::path RelativePath = RelativeRoot.empty() ? File : RelativeRoot / File; + Entries.push_back(FolderStructure::FileEntry{.RelativePath = RelativePath, .Size = FileSize}); + FileIds.push_back(Data.PathToIdCB(RelativePath)); + } + + virtual bool VisitDirectory(const std::filesystem::path& Parent, const path_view& DirectoryName) + { + ZEN_ASSERT(!Parent.empty()); + ZEN_ASSERT(!DirectoryName.empty()); + FolderScanner* DataPtr = &Data; + Data.WorkLatch.AddCount(1); + Data.WorkerPool.ScheduleWork([DataPtr, + RootDir = Parent / DirectoryName, + RelativeRoot = RelativeRoot.empty() ? DirectoryName : RelativeRoot / DirectoryName]() { + auto _ = MakeGuard([DataPtr]() { DataPtr->WorkLatch.CountDown(); }); + DataPtr->Traverse(RelativeRoot, RootDir); + }); + return false; + } + }; + + void FolderScanner::Traverse() + { + Stopwatch Timer; + Traverse({}, std::filesystem::absolute(Path)); + WorkLatch.CountDown(); + while (!WorkLatch.Wait(1000)) + { + WorkLock.WithSharedLock([&]() { ZEN_INFO("Found {} files in '{}'...", FoundFiles.size(), Path.string()); }); + } + ZEN_ASSERT(FoundFiles.size() == FoundFileIds.size()); + ZEN_INFO("Found {} files in '{}' in {}", FoundFiles.size(), Path.string(), NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); + } + + void FolderScanner::Traverse(const std::filesystem::path& RelativeRoot, const std::filesystem::path& AbsoluteRoot) + { + Visitor LeafVisitor(*this, RelativeRoot); + LeafVisitor.Traverser.TraverseFileSystem(AbsoluteRoot, LeafVisitor); + if (!LeafVisitor.Entries.empty()) + { + WorkLock.WithExclusiveLock([&]() { + FoundFiles.insert(FoundFiles.end(), LeafVisitor.Entries.begin(), LeafVisitor.Entries.end()); + FoundFileIds.insert(FoundFileIds.end(), LeafVisitor.FileIds.begin(), LeafVisitor.FileIds.end()); + }); + } + } +} // namespace + +std::unique_ptr<FolderStructure> +ScanFolder(LoggerRef InLog, + const std::filesystem::path& Path, + const std::function<Oid(const std::filesystem::path& Path)>& PathToIdCB, + WorkerThreadPool& WorkerPool) +{ + ZEN_TRACE_CPU("workspaces::ScanFolderImpl"); + + auto Log = [&InLog]() { return InLog; }; + + FolderScanner Data(InLog, WorkerPool, Path, PathToIdCB); + Data.Traverse(); + return std::make_unique<FolderStructure>(std::move(Data.FoundFiles), std::move(Data.FoundFileIds)); +} + +//////////////////////////////////////////////////////////// + +WorkspaceShare::WorkspaceShare(const Workspaces::WorkspaceShareConfiguration& Config, + std::unique_ptr<FolderStructure>&& FolderStructure, + std::function<Oid(const std::filesystem::path& Path)>&& PathToId) +: m_Config(Config) +, m_PathToid(std::move(PathToId)) +, m_FolderStructure(std::move(FolderStructure)) +{ +} + +std::filesystem::path +WorkspaceShare::GetAbsolutePath(const std::filesystem::path& RootPath, const Oid& Id, uint64_t& OutSize) const +{ + ZEN_ASSERT(m_FolderStructure); + const FolderStructure::FileEntry* Entry = m_FolderStructure->FindEntry(Id); + if (Entry == nullptr) + { + return {}; + } + OutSize = Entry->Size; + return RootPath / m_Config.SharePath / Entry->RelativePath; +} + +const Workspaces::WorkspaceShareConfiguration& +WorkspaceShare::GetConfig() const +{ + return m_Config; +} + +//////////////////////////////////////////////////////////// + +Workspace::Workspace(LoggerRef& Log, const Workspaces::WorkspaceConfiguration& Config) : m_Log(Log), m_Config(Config) +{ +} + +const Workspaces::WorkspaceConfiguration& +Workspace::GetConfig() const +{ + return m_Config; +} +std::vector<Ref<WorkspaceShare>> +Workspace::GetShares() const +{ + std::vector<Ref<WorkspaceShare>> Shares; + Shares.reserve(m_Shares.size()); + for (auto It : m_Shares) + { + Shares.push_back(It.second); + } + return Shares; +} + +Ref<WorkspaceShare> +Workspace::GetShare(const Oid& ShareId) const +{ + if (auto It = m_Shares.find(ShareId); It != m_Shares.end()) + { + return It->second; + } + return {}; +} + +void +Workspace::SetShare(const Oid& ShareId, Ref<WorkspaceShare>&& Share) +{ + if (Share) + { + m_Shares.insert_or_assign(ShareId, std::move(Share)); + } + else + { + m_Shares.erase(ShareId); + } +} + +//////////////////////////////////////////////////////////// + +Workspaces::Workspaces() : m_Log(logging::Get("workspaces")) +{ +} + +Workspaces::~Workspaces() +{ +} + +bool +Workspaces::AddWorkspace(const WorkspaceConfiguration& Configuration) +{ + Ref<Workspace> NewWorkspace(new Workspace(m_Log, Configuration)); + + RwLock::ExclusiveLockScope Lock(m_Lock); + if (m_Workspaces.contains(Configuration.Id)) + { + return false; + } + m_Workspaces.insert(std::make_pair(Configuration.Id, NewWorkspace)); + ZEN_INFO("Created workspace '{}' with root '{}'", Configuration.Id, Configuration.RootPath); + return true; +} + +Workspaces::WorkspaceConfiguration +Workspaces::GetWorkspaceConfiguration(const Oid& WorkspaceId) const +{ + RwLock::SharedLockScope Lock(m_Lock); + Ref<Workspace> Workspace = FindWorkspace(Lock, WorkspaceId); + if (Workspace) + { + return Workspace->GetConfig(); + } + return {}; +} + +Workspaces::WorkspaceInfo +Workspaces::GetWorkspaceInfo(const Oid& WorkspaceId) const +{ + Ref<Workspace> Workspace; + std::vector<Ref<WorkspaceShare>> Shares; + { + RwLock::SharedLockScope Lock(m_Lock); + Workspace = FindWorkspace(Lock, WorkspaceId); + if (Workspace) + { + Shares = Workspace->GetShares(); + } + } + if (!Workspace) + { + return {}; + } + + WorkspaceInfo Info = {.Config = Workspace->GetConfig()}; + Info.Shares.reserve(Shares.size()); + for (const Ref<WorkspaceShare>& Share : Shares) + { + Info.Shares.push_back(Share->GetConfig()); + } + return Info; +} + +bool +Workspaces::RemoveWorkspace(const Oid& WorkspaceId) +{ + RwLock::ExclusiveLockScope Lock(m_Lock); + if (auto It = m_Workspaces.find(WorkspaceId); It != m_Workspaces.end()) + { + m_Workspaces.erase(It); + ZEN_INFO("Removed workspace '{}'", WorkspaceId); + return true; + } + return false; +} + +bool +Workspaces::AddWorkspaceShare(const Oid& WorkspaceId, + const WorkspaceShareConfiguration& Configuration, + std::function<Oid(const std::filesystem::path& Path)>&& PathToIdCB) +{ + Ref<Workspace> Workspace; + { + RwLock::SharedLockScope Lock(m_Lock); + Workspace = FindWorkspace(Lock, WorkspaceId); + if (!Workspace) + { + return false; + } + if (Workspace->GetShare(Configuration.Id)) + { + return false; + } + } + + Ref<WorkspaceShare> NewShare(new WorkspaceShare(Configuration, {}, std::move(PathToIdCB))); + { + RwLock::ExclusiveLockScope _(m_Lock); + Workspace->SetShare(Configuration.Id, std::move(NewShare)); + } + ZEN_INFO("Added workspace share '{}' in workspace '{}' with path '{}'", Configuration.Id, WorkspaceId, Configuration.SharePath); + + return true; +} + +Workspaces::WorkspaceShareConfiguration +Workspaces::GetWorkspaceShareConfiguration(const Oid& WorkspaceId, const Oid& ShareId) const +{ + RwLock::SharedLockScope Lock(m_Lock); + Ref<Workspace> Workspace = FindWorkspace(Lock, WorkspaceId); + if (Workspace) + { + Ref<WorkspaceShare> Share = Workspace->GetShare(ShareId); + if (Share) + { + return Share->GetConfig(); + } + } + return {}; +} + +bool +Workspaces::RemoveWorkspaceShare(const Oid& WorkspaceId, const Oid& ShareId) +{ + Ref<Workspace> Workspace; + { + RwLock::SharedLockScope Lock(m_Lock); + Workspace = FindWorkspace(Lock, WorkspaceId); + if (!Workspace) + { + return false; + } + } + RwLock::ExclusiveLockScope _(m_Lock); + if (!Workspace->GetShare(ShareId)) + { + return false; + } + + Workspace->SetShare(ShareId, {}); + ZEN_INFO("Removed workspace share '{}' in workspace '{}'", ShareId, WorkspaceId); + return true; +} + +std::optional<std::vector<Workspaces::ShareFile>> +Workspaces::GetWorkspaceShareFiles(const Oid& WorkspaceId, const Oid& ShareId, bool ForceRefresh, WorkerThreadPool& WorkerPool) +{ + std::pair<Ref<Workspace>, Ref<WorkspaceShare>> WorkspaceAndShare = FindWorkspaceShare(WorkspaceId, ShareId, ForceRefresh, WorkerPool); + if (!WorkspaceAndShare.second) + { + return {}; + } + + const FolderStructure& Structure = WorkspaceAndShare.second->GetStructure(); + std::vector<Workspaces::ShareFile> Files; + Files.reserve(Structure.EntryCount()); + Structure.IterateEntries([&Files](const Oid& Id, const FolderStructure::FileEntry& Entry) { + std::string GenericPath(reinterpret_cast<const char*>(Entry.RelativePath.generic_u8string().c_str())); + Files.push_back(ShareFile{.RelativePath = std::move(GenericPath), .Size = Entry.Size, .Id = Id}); + }); + return Files; +} + +Workspaces::ShareFile +Workspaces::GetWorkspaceShareChunkInfo(const Oid& WorkspaceId, const Oid& ShareId, const Oid& ChunkId, WorkerThreadPool& WorkerPool) +{ + using namespace std::literals; + + std::pair<Ref<Workspace>, Ref<WorkspaceShare>> WorkspaceAndShare = FindWorkspaceShare(WorkspaceId, ShareId, false, WorkerPool); + if (!WorkspaceAndShare.second) + { + return {}; + } + + const FolderStructure::FileEntry* Entry = WorkspaceAndShare.second->GetStructure().FindEntry(ChunkId); + if (Entry) + { + std::string GenericPath(reinterpret_cast<const char*>(Entry->RelativePath.generic_u8string().c_str())); + return Workspaces::ShareFile{.RelativePath = std::move(GenericPath), .Size = Entry->Size, .Id = ChunkId}; + } + return {}; +} + +std::vector<IoBuffer> +Workspaces::GetWorkspaceShareChunks(const Oid& WorkspaceId, + const Oid& ShareId, + const std::span<const ChunkRequest> ChunkRequests, + WorkerThreadPool& WorkerPool) +{ + if (ChunkRequests.size() == 0) + { + return {}; + } + + std::pair<Ref<Workspace>, Ref<WorkspaceShare>> WorkspaceAndShare = FindWorkspaceShare(WorkspaceId, ShareId, false, WorkerPool); + if (!WorkspaceAndShare.second) + { + return {}; + } + + std::filesystem::path RootPath = WorkspaceAndShare.first->GetConfig().RootPath; + + auto GetOne = [this](const std::filesystem::path& RootPath, WorkspaceShare& Share, const ChunkRequest& Request) -> IoBuffer { + uint64_t Size; + std::filesystem::path Path = Share.GetAbsolutePath(RootPath, Request.ChunkId, Size); + if (!Path.empty()) + { + uint64_t RequestedOffset = Request.Offset; + uint64_t RequestedSize = Request.Size; + if (Request.Offset > 0 || Request.Size < uint64_t(-1)) + { + if (RequestedOffset > Size) + { + RequestedOffset = Size; + } + if ((RequestedOffset + RequestedSize) > Size) + { + RequestedSize = Size - RequestedOffset; + } + } + return IoBufferBuilder::MakeFromFile(Path, RequestedOffset, RequestedSize); + } + return IoBuffer{}; + }; + + if (ChunkRequests.size() == 1) + { + return std::vector<IoBuffer>({GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[0])}); + } + + std::vector<IoBuffer> Chunks; + Chunks.resize(ChunkRequests.size()); + + Latch WorkLatch(1); + for (size_t Index = 0; Index < ChunkRequests.size(); Index++) + { + WorkLatch.AddCount(1); + WorkerPool.ScheduleWork([&, Index]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + Chunks[Index] = GetOne(RootPath, *WorkspaceAndShare.second, ChunkRequests[Index]); + }); + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + + return Chunks; +} + +void +Workspaces::WriteState(const std::filesystem::path& WorkspaceStatePath) +{ + using namespace std::literals; + + ZEN_INFO("Writing workspaces state to {}", WorkspaceStatePath); + + RwLock::SharedLockScope _(m_Lock); + for (auto It : m_Workspaces) + { + const WorkspaceConfiguration& WorkspaceConfig = It.second->GetConfig(); + ZEN_ASSERT(WorkspaceConfig.Id == It.first); + std::filesystem::path WorkspaceConfigDir = WorkspaceStatePath / WorkspaceConfig.Id.ToString(); + CreateDirectories(WorkspaceConfigDir); + std::string WorkspaceConfigJson = WorkspaceToJson(WorkspaceConfig); + TemporaryFile::SafeWriteFile(WorkspaceConfigDir / "config.json"sv, + MemoryView(WorkspaceConfigJson.data(), WorkspaceConfigJson.size())); + + std::vector<Ref<WorkspaceShare>> Shares = It.second->GetShares(); + for (const Ref<WorkspaceShare>& Share : Shares) + { + const WorkspaceShareConfiguration& ShareConfig = Share->GetConfig(); + std::filesystem::path ShareConfigDir = WorkspaceConfigDir / "shares"sv / ShareConfig.Id.ToString(); + CreateDirectories(ShareConfigDir); + std::string ShareConfigJson = WorkspaceShareToJson(ShareConfig); + TemporaryFile::SafeWriteFile(ShareConfigDir / "config.json"sv, MemoryView(ShareConfigJson.data(), ShareConfigJson.size())); + } + } +} + +void +Workspaces::ReadState(const std::filesystem::path& WorkspaceStatePath, std::function<Oid(const std::filesystem::path& Path)>&& PathToIdCB) +{ + using namespace std::literals; + + if (std::filesystem::is_directory(WorkspaceStatePath)) + { + ZEN_INFO("Reading workspaces state from {}", WorkspaceStatePath); + DirectoryContent WorkspacesDirContent; + GetDirectoryContent(WorkspaceStatePath, DirectoryContent::IncludeDirsFlag, WorkspacesDirContent); + for (const std::filesystem::path& WorkspaceDirPath : WorkspacesDirContent.Directories) + { + Oid WorkspaceId = Oid::TryFromHexString(WorkspaceDirPath.filename().string()); + if (WorkspaceId != Oid::Zero) + { + std::string Error; + WorkspaceConfiguration WorkspaceConfig = + WorkspaceFromJson(IoBufferBuilder::MakeFromFile(WorkspaceDirPath / "config.json"sv), Error); + if (!Error.empty()) + { + ZEN_WARN("Failed to read workspace state from {}. Reason: '{}'", WorkspaceDirPath / "config.json"sv, Error); + } + else if (WorkspaceConfig.Id == WorkspaceId) + { + if (AddWorkspace(WorkspaceConfig)) + { + std::filesystem::path WorkspaceSharesStatePath = WorkspaceDirPath / "shares"sv; + if (std::filesystem::is_directory(WorkspaceSharesStatePath)) + { + DirectoryContent SharesDirContent; + GetDirectoryContent(WorkspaceDirPath / "shares"sv, DirectoryContent::IncludeDirsFlag, SharesDirContent); + for (const std::filesystem::path& ShareDirPath : SharesDirContent.Directories) + { + Oid ShareId = Oid::TryFromHexString(ShareDirPath.filename().string()); + if (ShareId != Oid::Zero) + { + WorkspaceShareConfiguration ShareConfig = + WorkspaceShareFromJson(IoBufferBuilder::MakeFromFile(ShareDirPath / "config.json"sv), Error); + if (!Error.empty()) + { + ZEN_WARN("Failed to read workspace share state from {}. Reason: '{}'", + ShareDirPath / "config.json"sv, + Error); + } + else if (ShareConfig.Id == ShareId) + { + AddWorkspaceShare(WorkspaceId, + ShareConfig, + std::function<Oid(const std::filesystem::path& Path)>(PathToIdCB)); + } + } + } + } + } + } + } + } + } +} + +std::pair<Ref<Workspace>, Ref<WorkspaceShare>> +Workspaces::FindWorkspaceShare(const Oid& WorkspaceId, const Oid& ShareId, bool ForceRefresh, WorkerThreadPool& WorkerPool) +{ + Ref<Workspace> Workspace; + Ref<WorkspaceShare> Share; + { + RwLock::SharedLockScope Lock(m_Lock); + Workspace = FindWorkspace(Lock, WorkspaceId); + if (!Workspace) + { + return {}; + } + Share = Workspace->GetShare(ShareId); + if (!Share) + { + return {}; + } + } + + if (ForceRefresh || !Share->IsInitialized()) + { + Workspaces::WorkspaceShareConfiguration Config = Share->GetConfig(); + std::filesystem::path RootPath = Workspace->GetConfig().RootPath; + std::function<Oid(const std::filesystem::path& Path)> PathToIdCB = Share->GetPathToIdFunction(); + std::unique_ptr<FolderStructure> NewStructure = ScanFolder(Log(), RootPath / Config.SharePath, PathToIdCB, WorkerPool); + if (NewStructure) + { + Share = Ref<WorkspaceShare>(new WorkspaceShare(Config, std::move(NewStructure), std::move(PathToIdCB))); + { + RwLock::ExclusiveLockScope _(m_Lock); + Workspace->SetShare(ShareId, Ref<WorkspaceShare>(Share)); + } + } + else + { + if (!Share->IsInitialized()) + { + ZEN_WARN("Failed to scan folder {} for share {} in workspace {}, treating it as an empty share", + WorkspaceId, + ShareId, + RootPath / Config.SharePath); + Share = Ref<WorkspaceShare>(new WorkspaceShare(Config, std::move(NewStructure), std::move(PathToIdCB))); + { + RwLock::ExclusiveLockScope _(m_Lock); + Workspace->SetShare(ShareId, Ref<WorkspaceShare>(Share)); + } + } + } + } + return {std::move(Workspace), std::move(Share)}; +} + +Ref<Workspace> +Workspaces::FindWorkspace(const RwLock::SharedLockScope&, const Oid& WorkspaceId) const +{ + if (auto It = m_Workspaces.find(WorkspaceId); It != m_Workspaces.end()) + { + return It->second; + } + return {}; +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +namespace { + Oid PathToId(const std::filesystem::path& Path) + { + return Oid::FromMemory(BLAKE3::HashMemory((const void*)Path.string().data(), Path.string().size()).Hash); + } + + std::vector<std::pair<std::filesystem::path, IoBuffer>> GenerateFolderContent(const std::filesystem::path& RootPath) + { + std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; + Result.push_back(std::make_pair(RootPath / "root_blob_1.bin", CreateRandomBlob(4122))); + Result.push_back(std::make_pair(RootPath / "root_blob_2.bin", CreateRandomBlob(2122))); + + std::filesystem::path EmptyFolder(RootPath / "empty_folder"); + + std::filesystem::path FirstFolder(RootPath / "first_folder"); + std::filesystem::create_directory(FirstFolder); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob1.bin", CreateRandomBlob(22))); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob2.bin", CreateRandomBlob(122))); + + std::filesystem::path SecondFolder(RootPath / "second_folder"); + std::filesystem::create_directory(SecondFolder); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob1.bin", CreateRandomBlob(522))); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob2.bin", CreateRandomBlob(122))); + Result.push_back(std::make_pair(SecondFolder / "second_folder_blob3.bin", CreateRandomBlob(225))); + + std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); + std::filesystem::create_directory(SecondFolderChild); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob1.bin", CreateRandomBlob(622))); + + for (const auto& It : Result) + { + WriteFile(It.first, It.second); + } + + return Result; + } + + std::vector<std::pair<std::filesystem::path, IoBuffer>> GenerateFolderContent2(const std::filesystem::path& RootPath) + { + std::vector<std::pair<std::filesystem::path, IoBuffer>> Result; + Result.push_back(std::make_pair(RootPath / "root_blob_3.bin", CreateRandomBlob(312))); + std::filesystem::path FirstFolder(RootPath / "first_folder"); + Result.push_back(std::make_pair(FirstFolder / "first_folder_blob3.bin", CreateRandomBlob(722))); + std::filesystem::path SecondFolder(RootPath / "second_folder"); + std::filesystem::path SecondFolderChild(SecondFolder / "child_in_second"); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob2.bin", CreateRandomBlob(962))); + Result.push_back(std::make_pair(SecondFolderChild / "second_child_folder_blob3.bin", CreateRandomBlob(561))); + + for (const auto& It : Result) + { + WriteFile(It.first, It.second); + } + + return Result; + } + +} // namespace + +TEST_CASE("workspaces.scanfolder") +{ + using namespace std::literals; + + WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path RootPath = TempDir.Path(); + (void)GenerateFolderContent(RootPath); + + std::unique_ptr<FolderStructure> Structure = ScanFolder( + logging::Default(), + RootPath, + [](const std::filesystem::path& Path) { return PathToId(Path); }, + WorkerPool); + CHECK(Structure); + + Structure->IterateEntries([&](const Oid& Id, const FolderStructure::FileEntry& Entry) { + std::filesystem::path AbsPath = RootPath / Entry.RelativePath; + CHECK(std::filesystem::is_regular_file(AbsPath)); + CHECK(std::filesystem::file_size(AbsPath) == Entry.Size); + const FolderStructure::FileEntry* FindEntry = Structure->FindEntry(Id); + CHECK(FindEntry); + std::filesystem::path Path = RootPath / FindEntry->RelativePath; + CHECK(AbsPath == Path); + CHECK(std::filesystem::file_size(AbsPath) == FindEntry->Size); + }); +} + +TEST_CASE("workspace.share.basic") +{ + using namespace std::literals; + + WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + + ScopedTemporaryDirectory TempDir; + std::filesystem::path RootPath = TempDir.Path(); + std::vector<std::pair<std::filesystem::path, IoBuffer>> Content = GenerateFolderContent(RootPath); + + Workspaces WS; + CHECK(WS.AddWorkspace({PathToId(RootPath), RootPath})); + CHECK(WS.AddWorkspaceShare(PathToId(RootPath), {PathToId("second_folder"), "second_folder"}, [](const std::filesystem::path& Path) { + return PathToId(Path); + })); + std::filesystem::path SharePath = RootPath / "second_folder"; + std::vector<std::filesystem::path> Paths = {{std::filesystem::relative(Content[4].first, SharePath)}, + {std::filesystem::relative(Content[6].first, SharePath)}, + {std::filesystem::relative(Content[7].first, SharePath)}, + {"the_file_that_is_not_there.txt"}}; + std::vector<IoBuffer> Chunks = WS.GetWorkspaceShareChunks(PathToId(RootPath), + PathToId("second_folder"), + std::vector<Workspaces::ChunkRequest>{{.ChunkId = PathToId(Paths[0])}, + {.ChunkId = PathToId(Paths[1])}, + {.ChunkId = PathToId(Paths[2])}, + {.ChunkId = PathToId(Paths[3])}}, + WorkerPool); + CHECK(Chunks.size() == 4); + CHECK(Chunks[0].GetView().EqualBytes(Content[4].second.GetView())); + CHECK(Chunks[1].GetView().EqualBytes(Content[6].second.GetView())); + CHECK(Chunks[2].GetView().EqualBytes(Content[7].second.GetView())); + CHECK(Chunks[3].GetSize() == 0); + + std::vector<std::pair<std::filesystem::path, IoBuffer>> Content2 = GenerateFolderContent2(RootPath); + std::vector<std::filesystem::path> Paths2 = {{std::filesystem::relative(Content2[2].first, SharePath)}, + {std::filesystem::relative(Content2[3].first, SharePath)}}; + + std::vector<IoBuffer> Chunks2 = WS.GetWorkspaceShareChunks( + PathToId(RootPath), + PathToId("second_folder"), + std::vector<Workspaces::ChunkRequest>{{.ChunkId = PathToId(Paths2[0])}, {.ChunkId = PathToId(Paths2[1])}}, + WorkerPool); + CHECK(Chunks2.size() == 2); + CHECK(Chunks2[0].GetSize() == 0); + CHECK(Chunks2[1].GetSize() == 0); + + std::optional<std::vector<Workspaces::ShareFile>> Files = + WS.GetWorkspaceShareFiles(PathToId(RootPath), PathToId("second_folder"), true, WorkerPool); + CHECK(Files.has_value()); + CHECK(Files.value().size() == 6); + + Chunks2 = WS.GetWorkspaceShareChunks( + PathToId(RootPath), + PathToId("second_folder"), + std::vector<Workspaces::ChunkRequest>{{.ChunkId = PathToId(Paths2[0])}, {.ChunkId = PathToId(Paths2[1])}}, + WorkerPool); + CHECK(Chunks2.size() == 2); + CHECK(Chunks2[0].GetView().EqualBytes(Content2[2].second.GetView())); + CHECK(Chunks2[1].GetView().EqualBytes(Content2[3].second.GetView())); + + Workspaces::ShareFile Entry = + WS.GetWorkspaceShareChunkInfo(PathToId(RootPath), PathToId("second_folder"), PathToId(Paths2[1]), WorkerPool); + CHECK(Entry.Id == PathToId(Paths2[1])); + CHECK(!Entry.RelativePath.empty()); + CHECK(Entry.Size == Content2[3].second.GetSize()); + + Files = WS.GetWorkspaceShareFiles(PathToId(RootPath), PathToId("second_folder"), false, WorkerPool); + CHECK(Files.has_value()); + CHECK(Files.value().size() == 6); + + CHECK(WS.RemoveWorkspaceShare(PathToId(RootPath), PathToId("second_folder"))); + CHECK(!WS.RemoveWorkspaceShare(PathToId(RootPath), PathToId("second_folder"))); + + Files = WS.GetWorkspaceShareFiles(PathToId(RootPath), PathToId("second_folder"), false, WorkerPool); + CHECK(!Files.has_value()); + + Chunks2 = WS.GetWorkspaceShareChunks( + PathToId(RootPath), + PathToId("second_folder"), + std::vector<Workspaces::ChunkRequest>{{.ChunkId = PathToId(Paths2[0])}, {.ChunkId = PathToId(Paths2[1])}}, + WorkerPool); + CHECK(Chunks2.empty()); + + CHECK(WS.RemoveWorkspace(PathToId(RootPath))); + CHECK(!WS.RemoveWorkspace(PathToId(RootPath))); +} + +#endif + +void +workspaces_forcelink() +{ +} + +} // namespace zen diff --git a/src/zenstore/zenstore.cpp b/src/zenstore/zenstore.cpp index 038c6bdc7..c697647d2 100644 --- a/src/zenstore/zenstore.cpp +++ b/src/zenstore/zenstore.cpp @@ -6,6 +6,7 @@ # include <zenstore/blockstore.h> # include <zenstore/cache/structuredcachestore.h> +# include <zenstore/workspaces.h> # include <zenstore/gc.h> # include <zenstore/hashkeyset.h> @@ -22,6 +23,7 @@ zenstore_forcelinktests() filecas_forcelink(); blockstore_forcelink(); compactcas_forcelink(); + workspaces_forcelink(); gc_forcelink(); hashkeyset_forcelink(); structured_cachestore_forcelink(); diff --git a/src/zenutil/chunkrequests.cpp b/src/zenutil/chunkrequests.cpp new file mode 100644 index 000000000..745363668 --- /dev/null +++ b/src/zenutil/chunkrequests.cpp @@ -0,0 +1,147 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/chunkrequests.h> + +#include <zencore/blake3.h> +#include <zencore/iobuffer.h> +#include <zencore/sharedbuffer.h> +#include <zencore/stream.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { +namespace { + struct RequestHeader + { + enum + { + kMagic = 0xAAAA'77AC + }; + uint32_t Magic; + uint32_t ChunkCount; + uint32_t Reserved1; + uint32_t Reserved2; + }; + + struct ResponseHeader + { + uint32_t Magic = 0xbada'b00f; + uint32_t ChunkCount; + uint32_t Reserved1 = 0; + uint32_t Reserved2 = 0; + }; + + struct ResponseChunkEntry + { + uint32_t CorrelationId; + uint32_t Flags = 0; + uint64_t ChunkSize; + }; +} // namespace + +IoBuffer +BuildChunkBatchRequest(const std::vector<RequestChunkEntry>& Entries) +{ + RequestHeader RequestHdr; + RequestHdr.Magic = (uint32_t)RequestHeader::kMagic; + RequestHdr.ChunkCount = gsl::narrow<uint32_t>(Entries.size()); + UniqueBuffer Buffer = UniqueBuffer::Alloc(sizeof(RequestHeader) + sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + MutableMemoryView WriteBuffer = Buffer.GetMutableView(); + WriteBuffer = WriteBuffer.CopyFrom(MemoryView(&RequestHdr, sizeof(RequestHeader))); + WriteBuffer.CopyFrom(MemoryView(Entries.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount)); + return Buffer.MoveToShared().AsIoBuffer(); +} + +std::optional<std::vector<RequestChunkEntry>> +ParseChunkBatchRequest(const IoBuffer& Payload) +{ + if (Payload.Size() <= sizeof(RequestHeader)) + { + return {}; + } + + BinaryReader Reader(Payload); + + RequestHeader RequestHdr; + Reader.Read(&RequestHdr, sizeof RequestHdr); + + if (RequestHdr.Magic != RequestHeader::kMagic) + { + return {}; + } + + std::vector<RequestChunkEntry> RequestedChunks; + RequestedChunks.resize(RequestHdr.ChunkCount); + Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount); + return RequestedChunks; +} + +std::vector<IoBuffer> +BuildChunkBatchResponse(const std::vector<RequestChunkEntry>& Requests, std::span<IoBuffer> Chunks) +{ + ZEN_ASSERT(Requests.size() == Chunks.size()); + size_t ChunkCount = Requests.size(); + + std::vector<IoBuffer> OutBlobs; + OutBlobs.reserve(1 + ChunkCount); + OutBlobs.emplace_back(sizeof(ResponseHeader) + ChunkCount * sizeof(ResponseChunkEntry)); + + uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData()); + ResponseHeader ResponseHdr; + ResponseHdr.ChunkCount = gsl::narrow<uint32_t>(Requests.size()); + memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr)); + ResponsePtr += sizeof(ResponseHdr); + for (uint32_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex) + { + const IoBuffer& FoundChunk(Chunks[ChunkIndex]); + ResponseChunkEntry ResponseChunk; + ResponseChunk.CorrelationId = Requests[ChunkIndex].CorrelationId; + if (FoundChunk) + { + ResponseChunk.ChunkSize = FoundChunk.Size(); + } + else + { + ResponseChunk.ChunkSize = uint64_t(-1); + } + memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk)); + ResponsePtr += sizeof(ResponseChunk); + } + OutBlobs.insert(OutBlobs.end(), Chunks.begin(), Chunks.end()); + auto It = std::remove_if(OutBlobs.begin() + 1, OutBlobs.end(), [](const IoBuffer& B) { return B.GetSize() == 0; }); + OutBlobs.erase(It, OutBlobs.end()); + return OutBlobs; +} + +std::vector<IoBuffer> +ParseChunkBatchResponse(const IoBuffer& Buffer) +{ + MemoryView View = Buffer.GetView(); + const ResponseHeader* Header = (const ResponseHeader*)View.GetData(); + if (Header->Magic != 0xbada'b00f) + { + return {}; + } + View.MidInline(sizeof(ResponseHeader)); + const ResponseChunkEntry* Entries = (const ResponseChunkEntry*)View.GetData(); + View.MidInline(sizeof(ResponseChunkEntry) * Header->ChunkCount); + std::vector<IoBuffer> Result(Header->ChunkCount); + for (uint32_t Index = 0; Index < Header->ChunkCount; Index++) + { + const ResponseChunkEntry& Entry = Entries[Index]; + if (Result.size() < Entry.CorrelationId + 1) + { + Result.resize(Entry.CorrelationId + 1); + } + if (Entry.ChunkSize != uint64_t(-1)) + { + Result[Entry.CorrelationId] = IoBuffer(IoBuffer::Wrap, View.GetData(), Entry.ChunkSize); + View.MidInline(Entry.ChunkSize); + } + } + return Result; +} + +} // namespace zen diff --git a/src/zenutil/include/zenutil/chunkrequests.h b/src/zenutil/include/zenutil/chunkrequests.h new file mode 100644 index 000000000..2d6c222bb --- /dev/null +++ b/src/zenutil/include/zenutil/chunkrequests.h @@ -0,0 +1,27 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/uid.h> + +#include <optional> +#include <span> +#include <vector> + +namespace zen { +class IoBuffer; + +struct RequestChunkEntry +{ + Oid ChunkId; + uint32_t CorrelationId; + uint64_t Offset; + uint64_t RequestBytes; +}; + +std::vector<IoBuffer> ParseChunkBatchResponse(const IoBuffer& Buffer); +IoBuffer BuildChunkBatchRequest(const std::vector<RequestChunkEntry>& Entries); +std::optional<std::vector<RequestChunkEntry>> ParseChunkBatchRequest(const IoBuffer& Payload); +std::vector<IoBuffer> BuildChunkBatchResponse(const std::vector<RequestChunkEntry>& Requests, std::span<IoBuffer> Chunks); + +} // namespace zen diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index a44ea4954..84544bac8 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -649,6 +649,12 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr CommandLine << " --test --log-id " << m_Name; CommandLine << " --no-sentry"; + + if (AdditionalServerArgs.find("--system-dir") == std::string_view::npos) + { + CommandLine << " --system-dir "; + PathToUtf8((m_Env.CreateNewTestDir() / "system-dir").c_str(), CommandLine); + } } if (m_OwnerPid.has_value()) |