aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-02 13:59:49 -0800
committerGitHub <[email protected]>2023-02-02 13:59:49 -0800
commit43eca4b121b76988c1fcda2db42145fe6a22c62e (patch)
tree4ffb3eff40e071c47a614d7ec8225f20f6484af5 /zenserver/projectstore/projectstore.cpp
parentAdd `project-create` and `oplog-create` to zen command line tool (#221) (diff)
downloadzen-43eca4b121b76988c1fcda2db42145fe6a22c62e.tar.xz
zen-43eca4b121b76988c1fcda2db42145fe6a22c62e.zip
remove legacy `export-project` and `import-project` (#222)
Diffstat (limited to 'zenserver/projectstore/projectstore.cpp')
-rw-r--r--zenserver/projectstore/projectstore.cpp203
1 files changed, 0 insertions, 203 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index 9a17443a8..188865edb 100644
--- a/zenserver/projectstore/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
@@ -2311,209 +2311,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
HttpVerb::kGet);
m_Router.RegisterRoute(
- "{project}/oplog/{log}/archive",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto& ProjectId = Req.GetCapture(1);
- const auto& OplogId = Req.GetCapture(2);
-
- Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId);
- if (!Project)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId);
-
- if (!FoundLog)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- switch (Req.ServerRequest().RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbObjectWriter Response;
- Response.BeginArray("entries"sv);
- std::unordered_set<IoHash> AttachementHashes;
- size_t OpCount = 0;
- IoHashStream Hasher;
-
- FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) {
- SharedBuffer Buffer = Op.GetBuffer();
- Hasher.Append(Buffer.GetView());
- Response << Op;
- Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsAttachment();
- AttachementHashes.emplace(AttachmentHash);
- });
- OpCount++;
- });
- Response.EndArray();
-
- IoHash Checksum = Hasher.GetHash();
- Response.AddHash("checksum"sv, Checksum);
-
- ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'",
- OpCount,
- AttachementHashes.size(),
- ProjectId,
- OplogId,
- Checksum);
-
- CbPackage ResponsePackage;
- ResponsePackage.SetObject(Response.Save());
-
- std::vector<CbAttachment> Attachments;
- Attachments.reserve(AttachementHashes.size());
- for (const IoHash& AttachmentHash : AttachementHashes)
- {
- IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash);
- if (Payload)
- {
- CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload));
- ZEN_ASSERT(Compressed);
- Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash));
- }
- }
- ResponsePackage.AddAttachments(Attachments);
-
- std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences);
- const ZenContentType AcceptType = HttpReq.AcceptContentType();
- if (AcceptType == ZenContentType::kCompressedBinary)
- {
- std::vector<SharedBuffer> Parts;
- Parts.reserve(ResponsePayload.size());
- for (const auto& I : ResponsePayload)
- {
- Parts.emplace_back(SharedBuffer(I));
- }
- CompositeBuffer Cmp(std::move(Parts));
- CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp);
- HttpReq.WriteResponse(HttpResponseCode::OK,
- HttpContentType::kCompressedBinary,
- CompressedResponse.GetCompressed().Flatten().AsIoBuffer());
- }
- else if (AcceptType == ZenContentType::kCbPackage)
- {
- HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload);
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
- }
- break;
- case HttpVerb::kPost:
- {
- ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId);
- IoBuffer CompressedPayload = HttpReq.ReadPayload();
- IoBuffer Payload =
- CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer();
-
- CbPackage RequestPackage = ParsePackageMessage(Payload);
- CbObject Request = RequestPackage.GetObject();
- IoHash Checksum = Request["checksum"sv].AsHash();
-
- std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments();
- zen ::CbArrayView Entries = Request["entries"sv].AsArrayView();
-
- ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'",
- Entries.Num(),
- Attachments.size(),
- Checksum,
- ProjectId,
- OplogId);
- std::vector<CbObject> Ops;
- Ops.reserve(Entries.Num());
- IoHashStream Hasher;
- for (auto& OpEntry : Entries)
- {
- CbObjectView Core = OpEntry.AsObjectView();
-
- if (!Core["key"sv])
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest,
- HttpContentType::kText,
- "No oplog entry key specified");
- }
-
- BinaryWriter Writer;
- Core.CopyTo(Writer);
- MemoryView OpView = Writer.GetView();
- Hasher.Append(OpView);
- IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize());
- CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType);
- Ops.emplace_back(Op);
- }
- IoHash CalculatedChecksum = Hasher.GetHash();
- if (CalculatedChecksum != Checksum)
- {
- ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
- }
-
- ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId);
- for (const CbObject& Op : Ops)
- {
- const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op);
- ZEN_DEBUG("oplog entry #{}", OpLsn);
-
- if (OpLsn == ProjectStore::Oplog::kInvalidOp)
- {
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
- }
-
- ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'",
- ProjectId,
- OplogId,
- OpLsn,
- NiceBytes(Op.GetSize()),
- Op["key"sv].AsString());
- }
-
- // Persist attachments after oplog entry so GC won't find attachments without references
- ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
-
- // We are creating a worker thread pool here since we are storing a lot of attachments in one go
- // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
- WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
- Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())};
- for (const CbAttachment& Attachment : Attachments)
- {
- WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() {
- try
- {
- CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(),
- Attachment.GetHash(),
- CidStore::InsertMode::kCopyOnly);
- }
- catch (std::exception& e)
- {
- ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'",
- Attachment.GetHash(),
- ProjectId,
- OplogId,
- e.what());
- }
- JobCount.CountDown();
- });
- }
- JobCount.Wait();
-
- ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId);
- return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
- }
- break;
- default:
- break;
- }
- },
- HttpVerb::kPost | HttpVerb::kGet);
- m_Router.RegisterRoute(
"{project}/oplog/{log}",
[this](HttpRouterRequest& Req) {
const auto& ProjectId = Req.GetCapture(1);