diff options
| author | Dan Engelbrecht <[email protected]> | 2022-12-01 21:55:17 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-12-01 12:55:17 -0800 |
| commit | 7d448505cf8a63e9e3f4ed6d606693daa1cf584b (patch) | |
| tree | e3d108e835b6cd5b8d814d0e1f077c813acf3bae /zenserver/projectstore.cpp | |
| parent | 0.1.9 (diff) | |
| download | zen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.tar.xz zen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.zip | |
Make sure we always store record/op before attachments (#195)
* Make sure we always store record/op before attachments
We don't want to store attachments first - a GC operation could then remove attachments if triggered before storing record/op
* zen::Latch
* Use latch to wait for attachments to be stored
* use zen::latch when adding attachments from project oplog import
* changelog
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 57 |
1 files changed, 35 insertions, 22 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 87118991e..c60d5b405 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -24,6 +24,8 @@ #include "config.h" +#include <latch> + ZEN_THIRD_PARTY_INCLUDES_START #include <xxh3.h> ZEN_THIRD_PARTY_INCLUDES_END @@ -626,7 +628,10 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) using namespace std::literals; - // Persist attachments + const CbObject& Core = OpPackage.GetObject(); + const uint32_t EntryId = AppendNewOplogEntry(Core); + + // Persist attachments after oplog entry so GC won't find attachments without references uint64_t AttachmentBytes = 0; uint64_t NewAttachmentBytes = 0; @@ -648,9 +653,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) AttachmentBytes += AttachmentSize; } - const CbObject& Core = OpPackage.GetObject(); - const uint32_t EntryId = AppendNewOplogEntry(Core); - ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes)); return EntryId; @@ -2243,24 +2245,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest); } - ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); - - WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); - std::atomic_int64_t JobCount = 0; - for (const CbAttachment& Attachment : Attachments) - { - JobCount.fetch_add(1); - WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() { - CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); - m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); - JobCount.fetch_add(-1); - }); - } - while (JobCount.load()) - { - Sleep(1); - } - ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId); for (const CbObject& Op : Ops) { @@ -2279,6 +2263,35 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects) NiceBytes(Op.GetSize()), Op["key"sv].AsString()); } + + // Persist attachments after oplog entry so GC won't find attachments without references + ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId); + + // We are creating a worker thread pool here since we are storing a lot of attachments in one go + // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. + WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u)); + Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())}; + for (const CbAttachment& Attachment : Attachments) + { + WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() { + try + { + CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary(); + m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly); + } + catch (std::exception& e) + { + ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'", + Attachment.GetHash(), + ProjectId, + OplogId, + e.what()); + } + JobCount.CountDown(); + }); + } + JobCount.Wait(); + ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId); return Req.ServerRequest().WriteResponse(HttpResponseCode::Created); } |