aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2023-02-16 13:58:49 +0100
committerGitHub <[email protected]>2023-02-16 13:58:49 +0100
commitf30de2640d99451a85ff2820fdd0b0b5816d2e61 (patch)
tree70bacc76c1eb8738e72387dfdf131230744e7ac8
parentchangelog (diff)
downloadzen-f30de2640d99451a85ff2820fdd0b0b5816d2e61.tar.xz
zen-f30de2640d99451a85ff2820fdd0b0b5816d2e61.zip
Experimental ObjectStore/CDN like endpoint
-rw-r--r--zenserver/config.cpp45
-rw-r--r--zenserver/config.h13
-rw-r--r--zenserver/objectstore/objectstore.cpp195
-rw-r--r--zenserver/objectstore/objectstore.h48
-rw-r--r--zenserver/zenserver.cpp21
5 files changed, 321 insertions, 1 deletions
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index fa4ee7f5a..cff93d67b 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -105,6 +105,34 @@ ParseUpstreamCachePolicy(std::string_view Options)
}
}
+ZenObjectStoreConfig
+ParseBucketConfigs(std::span<std::string> Buckets)
+{
+ using namespace std::literals;
+
+ ZenObjectStoreConfig Cfg;
+
+ // split bucket args in the form of "{BucketName};{LocalPath}"
+ for (std::string_view Bucket : Buckets)
+ {
+ ZenObjectStoreConfig::BucketConfig NewBucket;
+
+ if (auto Idx = Bucket.find_first_of(";"); Idx != std::string_view::npos)
+ {
+ NewBucket.Name = Bucket.substr(0, Idx);
+ NewBucket.Directory = Bucket.substr(Idx + 1);
+ }
+ else
+ {
+ NewBucket.Name = Bucket;
+ }
+
+ Cfg.Buckets.push_back(std::move(NewBucket));
+ }
+
+ return Cfg;
+}
+
void ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptions);
void
@@ -492,6 +520,21 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<uint64_t>(ServerOptions.GcConfig.Cache.DiskSizeSoftLimit)->default_value("0"),
"");
+ options.add_option("objectstore",
+ "",
+ "objectstore-enabled",
+ "Whether the object store is enabled or not.",
+ cxxopts::value<bool>(ServerOptions.ObjectStoreEnabled)->default_value("false"),
+ "");
+
+ std::vector<std::string> BucketConfigs;
+ options.add_option("objectstore",
+ "",
+ "objectstore-bucket",
+ "Object store bucket mappings.",
+ cxxopts::value<std::vector<std::string>>(BucketConfigs),
+ "");
+
try
{
auto result = options.parse(argc, argv);
@@ -545,6 +588,8 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
{.Name = OpenIdProviderName, .Url = OpenIdProviderUrl, .ClientId = OpenIdClientId});
}
+ ServerOptions.ObjectStoreConfig = ParseBucketConfigs(BucketConfigs);
+
if (!ServerOptions.ConfigFile.empty())
{
ParseConfigFile(ServerOptions.ConfigFile, ServerOptions);
diff --git a/zenserver/config.h b/zenserver/config.h
index 096a800b4..8a5c6de4e 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -107,11 +107,23 @@ struct ZenAuthConfig
std::vector<ZenOpenIdProviderConfig> OpenIdProviders;
};
+struct ZenObjectStoreConfig
+{
+ struct BucketConfig
+ {
+ std::string Name;
+ std::filesystem::path Directory;
+ };
+
+ std::vector<BucketConfig> Buckets;
+};
+
struct ZenServerOptions
{
ZenUpstreamCacheConfig UpstreamCacheConfig;
ZenGcConfig GcConfig;
ZenAuthConfig AuthConfig;
+ ZenObjectStoreConfig ObjectStoreConfig;
std::filesystem::path DataDir; // Root directory for state (used for testing)
std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
std::filesystem::path AbsLogFile; // Absolute path to main log file
@@ -136,6 +148,7 @@ struct ZenServerOptions
bool ShouldCrash = false; // Option for testing crash handling
bool IsFirstRun = false;
bool NoSentry = false;
+ bool ObjectStoreEnabled = false;
#if ZEN_WITH_TRACE
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
diff --git a/zenserver/objectstore/objectstore.cpp b/zenserver/objectstore/objectstore.cpp
new file mode 100644
index 000000000..950505bcb
--- /dev/null
+++ b/zenserver/objectstore/objectstore.cpp
@@ -0,0 +1,195 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <objectstore/objectstore.h>
+
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zencore/string.h>
+#include "zencore/compactbinarybuilder.h"
+#include "zenhttp/httpcommon.h"
+
+#include <thread>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <fmt/format.h>
+#include <json11.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+using namespace std::literals;
+
+ZEN_DEFINE_LOG_CATEGORY_STATIC(LogObj, "obj"sv);
+
+HttpObjectStoreService::HttpObjectStoreService(ObjectStoreConfig Cfg) : m_Cfg(std::move(Cfg))
+{
+ Inititalize();
+}
+
+HttpObjectStoreService::~HttpObjectStoreService()
+{
+}
+
+const char*
+HttpObjectStoreService::BaseUri() const
+{
+ return "/obj/";
+}
+
+void
+HttpObjectStoreService::HandleRequest(zen::HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_LOG_WARN(LogObj, "No route found for {0}", Request.RelativeUri());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv);
+ }
+}
+
+void
+HttpObjectStoreService::Inititalize()
+{
+ ZEN_LOG_INFO(LogObj, "Initialzing Object Store in '{}'", m_Cfg.RootDirectory);
+ for (const auto& Bucket : m_Cfg.Buckets)
+ {
+ ZEN_LOG_INFO(LogObj, " - bucket '{}' -> '{}'", Bucket.Name, Bucket.Directory);
+ }
+
+ m_Router.RegisterRoute(
+ "distributionpoints/{bucket}",
+ [this](zen::HttpRouterRequest& Request) {
+ const std::string BucketName = Request.GetCapture(1);
+
+ StringBuilder<1024> Json;
+ {
+ CbObjectWriter Writer;
+ Writer.BeginArray("distributions");
+ Writer << fmt::format("http://localhost:{}/obj/{}", m_Cfg.ServerPort, BucketName);
+ Writer.EndArray();
+ Writer.Save().ToJson(Json);
+ }
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kJSON, Json.ToString());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{bucket}/{path}",
+ [this](zen::HttpRouterRequest& Request) { GetBlob(Request); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{bucket}/{path}",
+ [this](zen::HttpRouterRequest& Request) { PutBlob(Request); },
+ HttpVerb::kPost | HttpVerb::kPut);
+}
+
+std::filesystem::path
+HttpObjectStoreService::GetBucketDirectory(std::string_view BucketName)
+{
+ std::lock_guard _(BucketsMutex);
+
+ if (const auto It = std::find_if(std::begin(m_Cfg.Buckets),
+ std::end(m_Cfg.Buckets),
+ [&BucketName](const auto& Bucket) -> bool { return Bucket.Name == BucketName; });
+ It != std::end(m_Cfg.Buckets))
+ {
+ return It->Directory;
+ }
+
+ return std::filesystem::path();
+}
+
+void
+HttpObjectStoreService::GetBlob(zen::HttpRouterRequest& Request)
+{
+ namespace fs = std::filesystem;
+
+ const std::string& BucketName = Request.GetCapture(1);
+ const fs::path BucketDir = GetBucketDirectory(BucketName);
+
+ if (BucketDir.empty())
+ {
+ ZEN_LOG_DEBUG(LogObj, "GET - [FAILED], unknown bucket '{}'", BucketName);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const fs::path RelativeBucketPath = Request.GetCapture(2);
+
+ if (RelativeBucketPath.is_absolute() || RelativeBucketPath.string().starts_with(".."))
+ {
+ ZEN_LOG_DEBUG(LogObj, "GET - from bucket '{}' [FAILED], invalid file path", BucketName);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ fs::path FilePath = BucketDir / RelativeBucketPath;
+ if (fs::exists(FilePath) == false)
+ {
+ ZEN_LOG_DEBUG(LogObj, "GET - '{}/{}' [FAILED], doesn't exist", BucketName, FilePath);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ FileContents File = ReadFile(FilePath);
+ if (File.ErrorCode)
+ {
+ ZEN_LOG_WARN(LogObj,
+ "GET - '{}/{}' [FAILED] ('{}': {})",
+ BucketName,
+ FilePath,
+ File.ErrorCode.category().name(),
+ File.ErrorCode.value());
+
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ const IoBuffer& FileBuf = File.Data[0];
+ const uint64_t Total = TotalBytesServed.fetch_add(FileBuf.Size()) + FileBuf.Size();
+
+ ZEN_LOG_DEBUG(LogObj,
+ "GET - '{}/{}' ({}) [OK] (Total: {})",
+ BucketName,
+ RelativeBucketPath,
+ NiceBytes(FileBuf.Size()),
+ NiceBytes(Total));
+
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK, HttpContentType::kUnknownContentType, FileBuf);
+}
+
+void
+HttpObjectStoreService::PutBlob(zen::HttpRouterRequest& Request)
+{
+ namespace fs = std::filesystem;
+
+ const std::string& BucketName = Request.GetCapture(1);
+ const fs::path BucketDir = GetBucketDirectory(BucketName);
+
+ if (BucketDir.empty())
+ {
+ ZEN_LOG_DEBUG(LogObj, "PUT - [FAILED], unknown bucket '{}'", BucketName);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const fs::path RelativeBucketPath = Request.GetCapture(2);
+
+ if (RelativeBucketPath.is_absolute() || RelativeBucketPath.string().starts_with(".."))
+ {
+ ZEN_LOG_DEBUG(LogObj, "PUT - bucket '{}' [FAILED], invalid file path", BucketName);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ fs::path FilePath = BucketDir / RelativeBucketPath;
+ const IoBuffer FileBuf = Request.ServerRequest().ReadPayload();
+
+ if (FileBuf.Size() == 0)
+ {
+ ZEN_LOG_DEBUG(LogObj, "PUT - '{}/{}' [FAILED], empty file", BucketName, FilePath);
+ return Request.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ WriteFile(FilePath, FileBuf);
+ ZEN_LOG_DEBUG(LogObj, "PUT - '{}/{}' [OK] ({})", BucketName, RelativeBucketPath, NiceBytes(FileBuf.Size()));
+ Request.ServerRequest().WriteResponse(HttpResponseCode::OK);
+}
+
+} // namespace zen
diff --git a/zenserver/objectstore/objectstore.h b/zenserver/objectstore/objectstore.h
new file mode 100644
index 000000000..eaab57794
--- /dev/null
+++ b/zenserver/objectstore/objectstore.h
@@ -0,0 +1,48 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenhttp/httpserver.h>
+#include <atomic>
+#include <filesystem>
+#include <mutex>
+
+namespace zen {
+
+class HttpRouterRequest;
+
+struct ObjectStoreConfig
+{
+ struct BucketConfig
+ {
+ std::string Name;
+ std::filesystem::path Directory;
+ };
+
+ std::filesystem::path RootDirectory;
+ std::vector<BucketConfig> Buckets;
+ uint16_t ServerPort{1337};
+};
+
+class HttpObjectStoreService final : public zen::HttpService
+{
+public:
+ HttpObjectStoreService(ObjectStoreConfig Cfg);
+ virtual ~HttpObjectStoreService();
+
+ virtual const char* BaseUri() const override;
+ virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+
+private:
+ void Inititalize();
+ std::filesystem::path GetBucketDirectory(std::string_view BucketName);
+ void GetBlob(zen::HttpRouterRequest& Request);
+ void PutBlob(zen::HttpRouterRequest& Request);
+
+ ObjectStoreConfig m_Cfg;
+ std::mutex BucketsMutex;
+ HttpRequestRouter m_Router;
+ std::atomic_uint64_t TotalBytesServed{0};
+};
+
+} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index 355df4523..6a9e19e50 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -108,6 +108,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "frontend/frontend.h"
#include "monitoring/httpstats.h"
#include "monitoring/httpstatus.h"
+#include "objectstore/objectstore.h"
#include "projectstore/projectstore.h"
#include "testing/httptest.h"
#include "upstream/upstream.h"
@@ -317,6 +318,23 @@ public:
m_Http->RegisterService(*m_FrontendService);
}
+ if (ServerOptions.ObjectStoreEnabled)
+ {
+ ObjectStoreConfig ObjCfg;
+ ObjCfg.RootDirectory = m_DataRoot / "obj";
+ ObjCfg.ServerPort = static_cast<uint16_t>(EffectiveBasePort);
+
+ for (const auto& Bucket : ServerOptions.ObjectStoreConfig.Buckets)
+ {
+ ObjectStoreConfig::BucketConfig NewBucket{.Name = Bucket.Name};
+ NewBucket.Directory = Bucket.Directory.empty() ? (ObjCfg.RootDirectory / Bucket.Name) : Bucket.Directory;
+ ObjCfg.Buckets.push_back(std::move(NewBucket));
+ }
+
+ m_ObjStoreService = std::make_unique<HttpObjectStoreService>(std::move(ObjCfg));
+ m_Http->RegisterService(*m_ObjStoreService);
+ }
+
ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds);
zen::GcSchedulerConfig GcConfig{.RootDirectory = m_DataRoot / "gc",
.MonitorInterval = std::chrono::seconds(ServerOptions.GcConfig.MonitorIntervalSeconds),
@@ -567,7 +585,8 @@ private:
#if ZEN_WITH_COMPUTE_SERVICES
std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService;
#endif // ZEN_WITH_COMPUTE_SERVICES
- std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
+ std::unique_ptr<zen::HttpFrontendService> m_FrontendService;
+ std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService;
bool m_DebugOptionForcedCrash = false;
bool m_UseSentry = false;