diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-02 13:59:49 -0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-02 13:59:49 -0800 |
| commit | 43eca4b121b76988c1fcda2db42145fe6a22c62e (patch) | |
| tree | 4ffb3eff40e071c47a614d7ec8225f20f6484af5 /zenserver/projectstore/projectstore.cpp | |
| parent | Add `project-create` and `oplog-create` to zen command line tool (#221) (diff) | |
| download | zen-43eca4b121b76988c1fcda2db42145fe6a22c62e.tar.xz zen-43eca4b121b76988c1fcda2db42145fe6a22c62e.zip | |
remove legacy `export-project` and `import-project` (#222)
Diffstat (limited to 'zenserver/projectstore/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 203 |
1 files changed, 0 insertions, 203 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index 9a17443a8..188865edb 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -2311,209 +2311,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) HttpVerb::kGet); m_Router.RegisterRoute( - "{project}/oplog/{log}/archive", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto& ProjectId = Req.GetCapture(1); - const auto& OplogId = Req.GetCapture(2); - - Ref<ProjectStore::Project> Project = m_ProjectStore->OpenProject(ProjectId); - if (!Project) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); - - if (!FoundLog) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - switch (Req.ServerRequest().RequestVerb()) - { - case HttpVerb::kGet: - { - CbObjectWriter Response; - Response.BeginArray("entries"sv); - std::unordered_set<IoHash> AttachementHashes; - size_t OpCount = 0; - IoHashStream Hasher; - - FoundLog->IterateOplog([this, &Hasher, &Response, &AttachementHashes, &OpCount](CbObject Op) { - SharedBuffer Buffer = Op.GetBuffer(); - Hasher.Append(Buffer.GetView()); - Response << Op; - Op.IterateAttachments([this, &AttachementHashes, &OpCount](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsAttachment(); - AttachementHashes.emplace(AttachmentHash); - }); - OpCount++; - }); - Response.EndArray(); - - IoHash Checksum = Hasher.GetHash(); - Response.AddHash("checksum"sv, Checksum); - - ZEN_INFO("Exporting {} ops and {} chunks from '{}/{}' with checksum '{}'", - OpCount, - AttachementHashes.size(), - ProjectId, - OplogId, - Checksum); - - CbPackage ResponsePackage; - ResponsePackage.SetObject(Response.Save()); - - std::vector<CbAttachment> Attachments; - Attachments.reserve(AttachementHashes.size()); - for (const IoHash& AttachmentHash : AttachementHashes) - { - IoBuffer Payload = m_CidStore.FindChunkByCid(AttachmentHash); - if (Payload) - { - CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(std::move(Payload)); - ZEN_ASSERT(Compressed); - Attachments.emplace_back(CbAttachment(Compressed, AttachmentHash)); - } - } - ResponsePackage.AddAttachments(Attachments); - - std::vector<IoBuffer> ResponsePayload = FormatPackageMessage(ResponsePackage, FormatFlags::kAllowLocalReferences); - const ZenContentType AcceptType = HttpReq.AcceptContentType(); - if (AcceptType == ZenContentType::kCompressedBinary) - { - std::vector<SharedBuffer> Parts; - Parts.reserve(ResponsePayload.size()); - for (const auto& I : ResponsePayload) - { - Parts.emplace_back(SharedBuffer(I)); - } - CompositeBuffer Cmp(std::move(Parts)); - CompressedBuffer CompressedResponse = CompressedBuffer::Compress(Cmp); - HttpReq.WriteResponse(HttpResponseCode::OK, - HttpContentType::kCompressedBinary, - CompressedResponse.GetCompressed().Flatten().AsIoBuffer()); - } - else if (AcceptType == ZenContentType::kCbPackage) - { - HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, ResponsePayload); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - } - break; - case HttpVerb::kPost: - { - ZEN_INFO("Importing oplog '{}/{}'", ProjectId, OplogId); - IoBuffer CompressedPayload = HttpReq.ReadPayload(); - IoBuffer Payload = - CompressedBuffer::FromCompressedNoValidate(std::move(CompressedPayload)).Decompress().AsIoBuffer(); - - CbPackage RequestPackage = ParsePackageMessage(Payload); - CbObject Request = RequestPackage.GetObject(); - IoHash Checksum = Request["checksum"sv].AsHash(); - - std::span<const CbAttachment> Attachments = RequestPackage.GetAttachments(); - zen ::CbArrayView Entries = Request["entries"sv].AsArrayView(); - - ZEN_INFO("Importing oplog with {} ops and {} attachments with checksum '{}' to '{}/{}'", - Entries.Num(), - Attachments.size(), - Checksum, - ProjectId, - OplogId); - std::vector<CbObject> Ops; - Ops.reserve(Entries.Num()); - IoHashStream Hasher; - for (auto& OpEntry : Entries) - { - CbObjectView Core = OpEntry.AsObjectView(); - - if (!Core["key"sv]) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, - HttpContentType::kText, - "No oplog entry key specified"); - } - - BinaryWriter Writer; - Core.CopyTo(Writer); - MemoryView OpView = Writer.GetView(); - Hasher.Append(OpView); - IoBuffer OpBuffer(IoBuffer::Clone, OpView.GetData(), OpView.GetSize()); - CbObject Op(SharedBuffer(OpBuffer), CbFieldType::HasFieldType); - Ops.emplace_back(Op); - } - IoHash CalculatedChecksum = Hasher.GetHash(); - if (CalculatedChecksum != Checksum) - { - ZEN_WARN("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); - } - - ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); - for (const CbObject& Op : Ops) - { - const uint32_t OpLsn = FoundLog->AppendNewOplogEntry(Op); - ZEN_DEBUG("oplog entry #{}", OpLsn); - - if (OpLsn == ProjectStore::Oplog::kInvalidOp) - { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest); - } - - ZEN_DEBUG("'{}/{}' op #{} ({}) - '{}'", - ProjectId, - OplogId, - OpLsn, - NiceBytes(Op.GetSize()), - Op["key"sv].AsString()); - } - - // Persist attachments after oplog entry so GC won't find attachments without references - ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); - - // We are creating a worker thread pool here since we are storing a lot of attachments in one go - // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. - WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); - Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())}; - for (const CbAttachment& Attachment : Attachments) - { - WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() { - try - { - CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(AttachmentBody.GetCompressed().Flatten().AsIoBuffer(), - Attachment.GetHash(), - CidStore::InsertMode::kCopyOnly); - } - catch (std::exception& e) - { - ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'", - Attachment.GetHash(), - ProjectId, - OplogId, - e.what()); - } - JobCount.CountDown(); - }); - } - JobCount.Wait(); - - ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); - return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); - } - break; - default: - break; - } - }, - HttpVerb::kPost | HttpVerb::kGet); - m_Router.RegisterRoute( "{project}/oplog/{log}", [this](HttpRouterRequest& Req) { const auto& ProjectId = Req.GetCapture(1); |