aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-09 16:49:51 +0100
committerGitHub <[email protected]>2023-02-09 07:49:51 -0800
commit2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97 (patch)
treed631da0746b78cad7140784de4e637bcfb4e1cac /zenserver/projectstore/projectstore.cpp
parentUpdate README.md (diff)
downloadzen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.tar.xz
zen-2f872e432d4a77d1c2dd082cb97a0cbfddb3cc97.zip
oplog upload/download (#214)
- Feature: Zen server endpoint `prj/{project}/oplog/{log}/chunks` to post multiple attachments in one request. - Feature: Zen server endpoint `prj/{project}/oplog/{log}/save` to save an oplog container. Accepts `CbObject` containing a compressed oplog and attachment references organized in blocks. - Feature: Zen server endpoint `prj/{project}/oplog/{log}/load` to request an oplog container. Responds with an `CbObject` containing a compressed oplog and attachment references organized in blocks. - Feature: Zen server endpoint `{project}/oplog/{log}/rpc` to initiate an import to or export from an external location and other operations. Use either JSon or CbPackage as payload. - CbObject/JSon RPC format for `import` and `export` methods: - CbObject RPC format for `getchunks` method, returns CbPackage with the found chunks, if all chunks are found the number of attachments matches number of chunks requested. - Feature: Zen server `{project}/oplog/{log}/{hash}` now accepts `HttpVerb::kPost` as well as `HttpVerb::kGet`. - Feature: Zen command line tool `oplog-export` to export an oplog to an external target using the zenserver oplog export endpoint. - Feature: Zen command line tool `oplog-import` to import an oplog from an external source using the zenserver oplog import endpoint.
Diffstat (limited to 'zenserver/projectstore/projectstore.cpp')
-rw-r--r--zenserver/projectstore/projectstore.cpp721
1 files changed, 675 insertions, 46 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index a07698d50..82aac1605 100644
--- a/zenserver/projectstore/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
@@ -5,32 +5,31 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
-#include <zencore/compactbinaryvalue.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
-#include <zencore/string.h>
-#include <zencore/testing.h>
-#include <zencore/testutils.h>
#include <zencore/timer.h>
#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
#include <zenhttp/httpshared.h>
#include <zenstore/caslog.h>
+#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
-#include <zenutil/basicfile.h>
-#include "config.h"
-
-#include <latch>
+#include "fileremoteprojectstore.h"
+#include "jupiterremoteprojectstore.h"
+#include "remoteprojectstore.h"
+#include "zenremoteprojectstore.h"
ZEN_THIRD_PARTY_INCLUDES_START
+#include <cpr/cpr.h>
#include <xxh3.h>
ZEN_THIRD_PARTY_INCLUDES_END
#if ZEN_WITH_TESTS
+# include <zencore/testing.h>
+# include <zencore/testutils.h>
#endif // ZEN_WITH_TESTS
namespace zen {
@@ -69,6 +68,149 @@ namespace {
Sleep(100);
} while (true);
}
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> CreateRemoteStore(CbObjectView Params,
+ AuthMgr& AuthManager,
+ size_t MaxBlockSize,
+ size_t MaxChunkEmbedSize)
+ {
+ using namespace std::literals;
+
+ std::unique_ptr<RemoteProjectStore> RemoteStore;
+
+ if (CbObjectView File = Params["file"sv].AsObjectView(); File)
+ {
+ std::filesystem::path FolderPath(File["path"sv].AsString());
+ if (FolderPath.empty())
+ {
+ return {nullptr, "Missing file path"};
+ }
+ std::string_view Name(File["name"sv].AsString());
+ if (Name.empty())
+ {
+ return {nullptr, "Missing file name"};
+ }
+ bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
+ bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ FolderPath,
+ std::string(Name),
+ ForceDisableBlocks,
+ ForceEnableTempBlocks};
+ RemoteStore = CreateFileRemoteStore(Options);
+ }
+
+ if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
+ {
+ std::string_view CloudServiceUrl = Cloud["url"sv].AsString();
+ if (CloudServiceUrl.empty())
+ {
+ return {nullptr, "Missing service url"};
+ }
+
+ std::string Url = cpr::util::urlDecode(std::string(CloudServiceUrl));
+ std::string_view Namespace = Cloud["namespace"sv].AsString();
+ if (Namespace.empty())
+ {
+ return {nullptr, "Missing namespace"};
+ }
+ std::string_view Bucket = Cloud["bucket"sv].AsString();
+ if (Bucket.empty())
+ {
+ return {nullptr, "Missing bucket"};
+ }
+ std::string_view OpenIdProvider = Cloud["openid-provider"sv].AsString();
+ std::string AccessToken = std::string(Cloud["access-token"sv].AsString());
+ if (AccessToken.empty())
+ {
+#if PLATFORM_WINDOWS
+
+ CHAR EnvVariableBuffer[1023 + 1];
+ DWORD RESULT = GetEnvironmentVariableA("UE-CloudDataCacheAccessToken", EnvVariableBuffer, sizeof(EnvVariableBuffer));
+ if (RESULT > 0 && RESULT < sizeof(EnvVariableBuffer))
+ {
+ AccessToken = std::string(EnvVariableBuffer);
+ }
+#endif
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ char* EnvVariable = getenv("UE_CloudDataCacheAccessToken");
+ if (EnvVariable)
+ {
+ AccessToken = std::string(EnvVariable);
+ }
+#endif
+ }
+ std::string_view KeyParam = Cloud["key"sv].AsString();
+ if (KeyParam.empty())
+ {
+ return {nullptr, "Missing key"};
+ }
+ if (KeyParam.length() != IoHash::StringLength)
+ {
+ return {nullptr, "Invalid key"};
+ }
+ IoHash Key = IoHash::FromHexString(KeyParam);
+ if (Key == IoHash::Zero)
+ {
+ return {nullptr, "Invalid key string"};
+ }
+ bool ForceDisableBlocks = Cloud["disableblocks"sv].AsBool(false);
+ bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
+
+ JupiterRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks};
+ RemoteStore = CreateJupiterRemoteStore(Options);
+ }
+
+ if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
+ {
+ std::string_view Url = Zen["url"sv].AsString();
+ std::string_view Project = Zen["project"sv].AsString();
+ if (Project.empty())
+ {
+ return {nullptr, "Missing project"};
+ }
+ std::string_view Oplog = Zen["oplog"sv].AsString();
+ if (Oplog.empty())
+ {
+ return {nullptr, "Missing oplog"};
+ }
+ ZenRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ std::string(Url),
+ std::string(Project),
+ std::string(Oplog)};
+ RemoteStore = CreateZenRemoteStore(Options);
+ }
+
+ if (!RemoteStore)
+ {
+ return {nullptr, "Unknown remote store type"};
+ }
+
+ return {std::move(RemoteStore), ""};
+ }
+
+ std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
+ {
+ if (Result.ErrorCode == 0)
+ {
+ return {HttpResponseCode::OK, Result.Text};
+ }
+ return {static_cast<HttpResponseCode>(Result.ErrorCode),
+ Result.Reason.empty() ? Result.Text
+ : Result.Text.empty() ? Result.Reason
+ : fmt::format("{}. Reason: '{}'", Result.Text, Result.Reason)};
+ }
+
} // namespace
//////////////////////////////////////////////////////////////////////////
@@ -794,7 +936,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
XXH3_128 KeyHash = KeyHasher.GetHash();
RefPtr<OplogStorage> Storage;
-
{
RwLock::SharedLockScope _(m_OplogLock);
Storage = m_Storage;
@@ -1613,7 +1754,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
if (!FoundLog)
{
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
}
if (ChunkId.size() != 2 * sizeof(Oid::OidBits))
@@ -1636,7 +1777,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
IoHash RawHash;
uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunk)), RawHash, RawSize);
ZEN_ASSERT(!Compressed.IsNull());
if (IsOffset)
@@ -1679,7 +1820,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId,
{
Size = Chunk.GetSize() - Offset;
}
- OutChunk = IoBuffer(Chunk, Offset, Size);
+ OutChunk = IoBuffer(std::move(Chunk), Offset, Size);
OutChunk.SetContentType(ContentType);
}
@@ -1693,8 +1834,6 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
ZenContentType AcceptType,
IoBuffer& OutChunk)
{
- using namespace std::literals;
-
Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
if (!Project)
{
@@ -1705,7 +1844,7 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
if (!FoundLog)
{
- return {HttpResponseCode::NotFound, fmt::format("Chunk info request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ return {HttpResponseCode::NotFound, fmt::format("Chunk request for unknown oplog '{}/{}'", ProjectId, OplogId)};
}
if (Cid.length() != IoHash::StringLength)
@@ -1721,25 +1860,401 @@ ProjectStore::GetChunk(const std::string_view ProjectId,
return {HttpResponseCode::NotFound, fmt::format("chunk - '{}' MISSING", Cid)};
}
- if (AcceptType == HttpContentType::kBinary)
+ if (AcceptType == ZenContentType::kUnknownContentType || AcceptType == ZenContentType::kBinary)
{
CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(OutChunk));
OutChunk = Compressed.Decompress().AsIoBuffer();
- OutChunk.SetContentType(HttpContentType::kBinary);
+ OutChunk.SetContentType(ZenContentType::kBinary);
}
else
{
- OutChunk.SetContentType(HttpContentType::kCompressedBinary);
+ OutChunk.SetContentType(ZenContentType::kCompressedBinary);
}
return {HttpResponseCode::OK, {}};
}
+std::pair<HttpResponseCode, std::string>
+ProjectStore::PutChunk(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const std::string_view Cid,
+ ZenContentType ContentType,
+ IoBuffer&& Chunk)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
+
+ if (!FoundLog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Chunk put request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (Cid.length() != IoHash::StringLength)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk put request for invalid chunk hash '{}'", Cid)};
+ }
+
+ const IoHash Hash = IoHash::FromHexString(Cid);
+
+ if (ContentType != HttpContentType::kCompressedBinary)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid content type for chunk '{}'", Cid)};
+ }
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk), RawHash, RawSize);
+ if (RawHash != Hash)
+ {
+ return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid payload format for chunk '{}'", Cid)};
+ }
+
+ CidStore::InsertResult Result = m_CidStore.AddChunk(Chunk, Hash);
+ return {Result.New ? HttpResponseCode::Created : HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteOplog(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload, CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ CbObject ContainerObject = LoadCompactBinaryObject(Payload);
+ if (!ContainerObject)
+ {
+ return {HttpResponseCode::BadRequest, "Invalid payload format"};
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+ RwLock AttachmentsLock;
+ std::unordered_set<IoHash, IoHash::Hasher> Attachments;
+
+ auto HasAttachment = [&ChunkStore](const IoHash& RawHash) { return ChunkStore.ContainsChunk(RawHash); };
+ auto OnNeedBlock = [&AttachmentsLock, &Attachments](const IoHash& BlockHash, const std::vector<IoHash>&& ChunkHashes) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ if (BlockHash != IoHash::Zero)
+ {
+ Attachments.insert(BlockHash);
+ }
+ else
+ {
+ Attachments.insert(ChunkHashes.begin(), ChunkHashes.end());
+ }
+ };
+ auto OnNeedAttachment = [&AttachmentsLock, &Attachments](const IoHash& RawHash) {
+ RwLock::ExclusiveLockScope _(AttachmentsLock);
+ Attachments.insert(RawHash);
+ };
+
+ RemoteProjectStore::Result RemoteResult = SaveOplogContainer(*Oplog, ContainerObject, HasAttachment, OnNeedBlock, OnNeedAttachment);
+
+ if (RemoteResult.ErrorCode)
+ {
+ return ConvertResult(RemoteResult);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+ {
+ for (const IoHash& Hash : Attachments)
+ {
+ ZEN_DEBUG("Need attachment {}", Hash);
+ Cbo << Hash;
+ }
+ }
+ Cbo.EndArray(); // "need"
+
+ OutResponse = Cbo.Save();
+ return {HttpResponseCode::OK, {}};
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::ReadOplog(const std::string_view ProjectId,
+ const std::string_view OplogId,
+ const HttpServerRequest::QueryParams& Params,
+ CbObject& OutResponse)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Read oplog request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ size_t MaxBlockSize = 128u * 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxblocksize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxBlockSize = Value.value();
+ }
+ }
+ size_t MaxChunkEmbedSize = 1024u * 1024u;
+ if (auto Param = Params.GetValue("maxchunkembedsize"); Param.empty() == false)
+ {
+ if (auto Value = ParseInt<size_t>(Param))
+ {
+ MaxChunkEmbedSize = Value.value();
+ }
+ }
+
+ CidStore& ChunkStore = m_CidStore;
+
+ RemoteProjectStore::LoadContainerResult ContainerResult = BuildContainer(
+ ChunkStore,
+ *Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ false,
+ [](CompressedBuffer&&, const IoHash) {},
+ [](const IoHash&) {},
+ [](const std::unordered_set<IoHash, IoHash::Hasher>) {});
+
+ OutResponse = std::move(ContainerResult.ContainerObject);
+ return ConvertResult(ContainerResult);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::WriteBlock(const std::string_view ProjectId, const std::string_view OplogId, IoBuffer&& Payload)
+{
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown project '{}'", ProjectId)};
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return {HttpResponseCode::NotFound, fmt::format("Write block request for unknown oplog '{}/{}'", ProjectId, OplogId)};
+ }
+
+ if (!IterateBlock(std::move(Payload), [this](CompressedBuffer&& Chunk, const IoHash& AttachmentRawHash) {
+ IoBuffer Compressed = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ m_CidStore.AddChunk(Compressed, AttachmentRawHash);
+ ZEN_DEBUG("Saved attachment {} from block, size {}", AttachmentRawHash, Compressed.GetSize());
+ }))
+ {
+ return {HttpResponseCode::BadRequest, "Invalid chunk in block"};
+ }
+
+ return {HttpResponseCode::OK, {}};
+}
+
+void
+ProjectStore::Rpc(HttpServerRequest& HttpReq,
+ const std::string_view ProjectId,
+ const std::string_view OplogId,
+ IoBuffer&& Payload,
+ AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+ HttpContentType PayloadContentType = HttpReq.RequestContentType();
+ CbPackage Package;
+ CbObject Cb;
+ switch (PayloadContentType)
+ {
+ case HttpContentType::kJSON:
+ case HttpContentType::kUnknownContentType:
+ case HttpContentType::kText:
+ {
+ std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
+ Cb = LoadCompactBinaryFromJson(JsonText).AsObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected JSON format");
+ }
+ }
+ break;
+ case HttpContentType::kCbObject:
+ Cb = LoadCompactBinaryObject(Payload);
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected compact binary format");
+ }
+ break;
+ case HttpContentType::kCbPackage:
+ Package = ParsePackageMessage(Payload);
+ Cb = Package.GetObject();
+ if (!Cb)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ "Content format not supported, expected package message format");
+ }
+ break;
+ default:
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type");
+ }
+
+ Ref<ProjectStore::Project> Project = OpenProject(ProjectId);
+ if (!Project)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown project '{}'", ProjectId));
+ }
+
+ ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId);
+
+ if (!Oplog)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound,
+ HttpContentType::kText,
+ fmt::format("Rpc oplog request for unknown oplog '{}/{}'", ProjectId, OplogId));
+ }
+
+ std::string_view Method = Cb["method"sv].AsString();
+
+ if (Method == "import")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Import(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "export")
+ {
+ std::pair<HttpResponseCode, std::string> Result = Export(*Project.Get(), *Oplog, Cb["params"sv].AsObjectView(), AuthManager);
+ if (Result.second.empty())
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ else if (Method == "getchunks")
+ {
+ CbPackage ResponsePackage;
+ {
+ CbArrayView ChunksArray = Cb["chunks"sv].AsArrayView();
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("chunks"sv);
+ for (CbFieldView FieldView : ChunksArray)
+ {
+ IoHash RawHash = FieldView.AsHash();
+ IoBuffer ChunkBuffer = m_CidStore.FindChunkByCid(RawHash);
+ if (ChunkBuffer)
+ {
+ ResponseWriter.AddHash(RawHash);
+ ResponsePackage.AddAttachment(
+ CbAttachment(CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)), RawHash));
+ }
+ }
+ ResponseWriter.EndArray();
+ ResponsePackage.SetObject(ResponseWriter.Save());
+ }
+ CompositeBuffer RpcResponseBuffer = FormatPackageMessageBuffer(ResponsePackage, FormatFlags::kDefault);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, RpcResponseBuffer);
+ }
+ else if (Method == "putchunks")
+ {
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ IoHash RawHash = Attachment.GetHash();
+ CompressedBuffer Compressed = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(Compressed.GetCompressed().Flatten().AsIoBuffer(), RawHash, CidStore::InsertMode::kCopyOnly);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, fmt::format("Unknown rpc method '{}'", Method));
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Export(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Saving oplog '{}/{}' to {}, maxblocksize {}, maxchunkembedsize {}",
+ Project.Identifier,
+ Oplog.OplogId(),
+ StoreInfo.Description,
+ NiceBytes(MaxBlockSize),
+ NiceBytes(MaxChunkEmbedSize));
+
+ RemoteProjectStore::Result Result = SaveOplog(m_CidStore,
+ *RemoteStore,
+ Oplog,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ StoreInfo.CreateBlocks,
+ StoreInfo.UseTempBlockFiles,
+ Force);
+
+ return ConvertResult(Result);
+}
+
+std::pair<HttpResponseCode, std::string>
+ProjectStore::Import(ProjectStore::Project& Project, ProjectStore::Oplog& Oplog, CbObjectView&& Params, AuthMgr& AuthManager)
+{
+ using namespace std::literals;
+
+ size_t MaxBlockSize = Params["maxblocksize"sv].AsUInt64(128u * 1024u * 1024u);
+ size_t MaxChunkEmbedSize = Params["maxchunkembedsize"sv].AsUInt64(1024u * 1024u);
+ bool Force = Params["force"sv].AsBool(false);
+
+ std::pair<std::unique_ptr<RemoteProjectStore>, std::string> RemoteStoreResult =
+ CreateRemoteStore(Params, AuthManager, MaxBlockSize, MaxChunkEmbedSize);
+
+ if (RemoteStoreResult.first == nullptr)
+ {
+ return {HttpResponseCode::BadRequest, RemoteStoreResult.second};
+ }
+ std::unique_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.first);
+ RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
+
+ ZEN_INFO("Loading oplog '{}/{}' from {}", Project.Identifier, Oplog.OplogId(), StoreInfo.Description);
+ RemoteProjectStore::Result Result = LoadOplog(m_CidStore, *RemoteStore, Oplog, Force);
+ return ConvertResult(Result);
+}
+
//////////////////////////////////////////////////////////////////////////
-HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
+HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects, AuthMgr& AuthMgr)
: m_Log(logging::Get("project"))
, m_CidStore(Store)
, m_ProjectStore(Projects)
+, m_AuthMgr(AuthMgr)
{
using namespace std::literals;
@@ -1918,7 +2433,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
static_cast<int>(Result.first),
Result.second);
}
-
return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
},
HttpVerb::kGet);
@@ -2023,33 +2537,60 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
- const auto& Cid = Req.GetCapture(3);
- HttpContentType AcceptType = HttpReq.AcceptContentType();
-
- IoBuffer Value;
- std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& Cid = Req.GetCapture(3);
+ HttpContentType AcceptType = HttpReq.AcceptContentType();
+ HttpContentType RequestType = HttpReq.RequestContentType();
- if (Result.first == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
- }
- else if (Result.first == HttpResponseCode::NotFound)
- {
- ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
- }
- else
+ switch (Req.ServerRequest().RequestVerb())
{
- ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
- ToString(HttpReq.RequestVerb()),
- HttpReq.QueryString(),
- static_cast<int>(Result.first),
- Result.second);
+ case HttpVerb::kGet:
+ {
+ IoBuffer Value;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->GetChunk(ProjectId, OplogId, Cid, AcceptType, Value);
+
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Value.GetContentType(), Value);
+ }
+ else if (Result.first == HttpResponseCode::NotFound)
+ {
+ ZEN_DEBUG("chunk - '{}/{}/{}' MISSING", ProjectId, OplogId, Cid);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ case HttpVerb::kPost:
+ {
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->PutChunk(ProjectId, OplogId, Cid, RequestType, HttpReq.ReadPayload());
+ if (Result.first == HttpResponseCode::OK || Result.first == HttpResponseCode::Created)
+ {
+ return HttpReq.WriteResponse(Result.first);
+ }
+ else
+ {
+ ZEN_DEBUG("Request {}: '{}' failed with {}. Reason: `{}`",
+ ToString(HttpReq.RequestVerb()),
+ HttpReq.QueryString(),
+ static_cast<int>(Result.first),
+ Result.second);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ }
+ break;
}
- return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
},
- HttpVerb::kGet);
+ HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
"{project}/oplog/{log}/prep",
@@ -2556,6 +3097,67 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
}
},
HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete);
+
+ // Push a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/save",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.RequestContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result = m_ProjectStore->WriteOplog(ProjectId, OplogId, std::move(Payload), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kPost);
+
+ // Pull a oplog container
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/load",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ if (HttpReq.AcceptContentType() != HttpContentType::kCbObject)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid accept content type");
+ }
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ CbObject Response;
+ std::pair<HttpResponseCode, std::string> Result =
+ m_ProjectStore->ReadOplog(ProjectId, OplogId, Req.ServerRequest().GetQueryParams(), Response);
+ if (Result.first == HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Response);
+ }
+ return HttpReq.WriteResponse(Result.first, HttpContentType::kText, Result.second);
+ },
+ HttpVerb::kGet);
+
+ // Do an rpc style operation on project/oplog
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/rpc",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+
+ m_ProjectStore->Rpc(HttpReq, ProjectId, OplogId, std::move(Payload), m_AuthMgr);
+ },
+ HttpVerb::kPost);
}
HttpProjectService::~HttpProjectService()
@@ -2625,9 +3227,14 @@ namespace testutils {
{
std::vector<uint8_t> Data;
Data.resize(Size);
- for (size_t Idx = 0; Idx < Size; ++Idx)
+ uint16_t* DataPtr = reinterpret_cast<uint16_t*>(Data.data());
+ for (size_t Idx = 0; Idx < Size / 2; ++Idx)
{
- Data[Idx] = Idx % 255;
+ DataPtr[Idx] = static_cast<uint16_t>(Idx % 0xffffu);
+ }
+ if (Size & 1)
+ {
+ Data[Size - 1] = static_cast<uint8_t>((Size - 1) & 0xff);
}
CompressedBuffer Compressed = CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()));
Result.emplace_back(std::pair<Oid, CompressedBuffer>(Oid::NewOid(), Compressed));
@@ -2908,6 +3515,28 @@ TEST_CASE("project.store.partial.read")
const uint8_t* PartialDataPtr = reinterpret_cast<const uint8_t*>(PartialDecompressed.GetView().GetData());
CHECK(FullDataPtr[0] == PartialDataPtr[0]);
}
+
+TEST_CASE("project.store.block")
+{
+ using namespace std::literals;
+ using namespace testutils;
+
+ std::vector<std::size_t> AttachmentSizes({7633, 6825, 5738, 8031, 7225, 566, 3656, 6006, 24, 3466, 1093, 4269, 2257, 3685, 3489,
+ 7194, 6151, 5482, 6217, 3511, 6738, 5061, 7537, 2759, 1916, 8210, 2235, 4024, 1582, 5251,
+ 491, 5464, 4607, 8135, 3767, 4045, 4415, 5007, 8876, 6761, 3359, 8526, 4097, 4855, 8225});
+
+ std::vector<std::pair<Oid, CompressedBuffer>> AttachmentsWithId = CreateAttachments(AttachmentSizes);
+ std::vector<SharedBuffer> Chunks;
+ Chunks.reserve(AttachmentSizes.size());
+ for (const auto& It : AttachmentsWithId)
+ {
+ Chunks.push_back(It.second.GetCompressed().Flatten());
+ }
+ CompressedBuffer Block = GenerateBlock(std::move(Chunks));
+ IoBuffer BlockBuffer = Block.GetCompressed().Flatten().AsIoBuffer();
+ CHECK(IterateBlock(std::move(BlockBuffer), [](CompressedBuffer&&, const IoHash&) {}));
+}
+
#endif
void