aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-01 21:55:17 +0100
committerGitHub <[email protected]>2022-12-01 12:55:17 -0800
commit7d448505cf8a63e9e3f4ed6d606693daa1cf584b (patch)
treee3d108e835b6cd5b8d814d0e1f077c813acf3bae /zenserver/projectstore.cpp
parent0.1.9 (diff)
downloadzen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.tar.xz
zen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.zip
Make sure we always store record/op before attachments (#195)
* Make sure we always store record/op before attachments We don't want to store attachments first - a GC operation could then remove attachments if triggered before storing record/op * zen::Latch * Use latch to wait for attachments to be stored * use zen::latch when adding attachments from project oplog import * changelog
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp57
1 files changed, 35 insertions, 22 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 87118991e..c60d5b405 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -24,6 +24,8 @@
#include "config.h"
+#include <latch>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <xxh3.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -626,7 +628,10 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
using namespace std::literals;
- // Persist attachments
+ const CbObject& Core = OpPackage.GetObject();
+ const uint32_t EntryId = AppendNewOplogEntry(Core);
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
uint64_t AttachmentBytes = 0;
uint64_t NewAttachmentBytes = 0;
@@ -648,9 +653,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
AttachmentBytes += AttachmentSize;
}
- const CbObject& Core = OpPackage.GetObject();
- const uint32_t EntryId = AppendNewOplogEntry(Core);
-
ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
return EntryId;
@@ -2243,24 +2245,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
-
- WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
- std::atomic_int64_t JobCount = 0;
- for (const CbAttachment& Attachment : Attachments)
- {
- JobCount.fetch_add(1);
- WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() {
- CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
- JobCount.fetch_add(-1);
- });
- }
- while (JobCount.load())
- {
- Sleep(1);
- }
-
ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId);
for (const CbObject& Op : Ops)
{
@@ -2279,6 +2263,35 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
NiceBytes(Op.GetSize()),
Op["key"sv].AsString());
}
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
+ ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
+
+ // We are creating a worker thread pool here since we are storing a lot of attachments in one go
+ // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
+ WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
+ Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())};
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() {
+ try
+ {
+ CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
+ }
+ catch (std::exception& e)
+ {
+ ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'",
+ Attachment.GetHash(),
+ ProjectId,
+ OplogId,
+ e.what());
+ }
+ JobCount.CountDown();
+ });
+ }
+ JobCount.Wait();
+
ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId);
return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
}