aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2022-12-01 21:55:17 +0100
committerGitHub <[email protected]>2022-12-01 12:55:17 -0800
commit7d448505cf8a63e9e3f4ed6d606693daa1cf584b (patch)
treee3d108e835b6cd5b8d814d0e1f077c813acf3bae
parent0.1.9 (diff)
downloadzen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.tar.xz
zen-7d448505cf8a63e9e3f4ed6d606693daa1cf584b.zip
Make sure we always store record/op before attachments (#195)
* Make sure we always store record/op before attachments We don't want to store attachments first - a GC operation could then remove attachments if triggered before storing record/op * zen::Latch * Use latch to wait for attachments to be stored * use zen::latch when adding attachments from project oplog import * changelog
-rw-r--r--CHANGELOG.md3
-rw-r--r--zencore/include/zencore/thread.h35
-rw-r--r--zenserver/cache/structuredcache.cpp231
-rw-r--r--zenserver/projectstore.cpp57
4 files changed, 208 insertions, 118 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7ea28f80a..792e3a4c9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,7 @@
##
+- Bugfix: Always store records or oplog entries before storing attachments to avoid GC finding unreferenced chunks i CidStore
+
+## 0.1.9
- Feature: Adds two command to Zen command tool to export/import project store oplogs with attachments
- `export-project <target-folder> <project> [oplogs...]`
- `import-project <target-folder> <project> [oplogs...]`
diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h
index 16d0e9dee..3c1821a62 100644
--- a/zencore/include/zencore/thread.h
+++ b/zencore/include/zencore/thread.h
@@ -4,9 +4,9 @@
#include "zencore.h"
-#include <shared_mutex>
-
+#include <atomic>
#include <filesystem>
+#include <shared_mutex>
#include <string_view>
#include <vector>
@@ -144,6 +144,37 @@ private:
void* m_MutexHandle = nullptr;
};
+/**
+ * Downward counter of type std::ptrdiff_t which can be used to synchronize threads
+ */
+class Latch
+{
+public:
+ Latch(std::ptrdiff_t Count) : Counter(Count) {}
+
+ void CountDown()
+ {
+ std::ptrdiff_t Old = Counter.fetch_sub(1);
+ if (Old == 1)
+ {
+ Complete.Set();
+ }
+ }
+
+ void Wait()
+ {
+ std::ptrdiff_t Old = Counter.load();
+ if (Old != 0)
+ {
+ Complete.Wait();
+ }
+ }
+
+private:
+ std::atomic_ptrdiff_t Counter;
+ Event Complete;
+};
+
/** Basic process abstraction
*/
class ProcessHandle
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index dfb69c0fe..0df7472ac 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -651,44 +651,63 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
CbPackage Package;
if (Package.TryLoad(ClientResultValue.Value))
{
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
-
- CacheRecord.IterateAttachments([this, &Package, &Ref, &Count, QueryLocal, StoreLocal](CbFieldView HashView) {
- if (const CbAttachment* Attachment = Package.FindAttachment(HashView.AsHash()))
- {
- if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ AttachmentsToStoreLocally.reserve(NumAttachments);
+
+ CacheRecord.IterateAttachments(
+ [this, &Package, &Ref, &AttachmentsToStoreLocally, &Count, QueryLocal, StoreLocal, SkipData](CbFieldView HashView) {
+ IoHash Hash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
- if (StoreLocal)
+ if (Attachment->IsCompressedBinary())
{
- auto InsertResult = m_CidStore.AddChunk(Compressed);
- if (InsertResult.New)
+ if (StoreLocal)
{
- Count.New++;
+ AttachmentsToStoreLocally.emplace_back(Attachment);
}
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
+ Hash,
+ Ref.BucketSegment,
+ Ref.HashKey);
+ Count.Invalid++;
}
- Count.Valid++;
- }
- else
- {
- ZEN_WARN("Uncompressed value '{}' from upstream cache record '{}/{}'",
- HashView.AsHash(),
- Ref.BucketSegment,
- Ref.HashKey);
- Count.Invalid++;
}
- }
- else if (QueryLocal)
- {
- if (IoBuffer Chunk = m_CidStore.FindChunkByCid(HashView.AsHash()))
+ else if (QueryLocal)
{
- Package.AddAttachment(
- CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)), HashView.AsHash()));
- Count.Valid++;
+ if (SkipData)
+ {
+ if (m_CidStore.ContainsChunk(Hash))
+ {
+ Count.Valid++;
+ }
+ }
+ else if (IoBuffer Chunk = m_CidStore.FindChunkByCid(Hash))
+ {
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Chunk));
+ if (Compressed)
+ {
+ Package.AddAttachment(CbAttachment(Compressed, Hash));
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("Uncompressed value '{}' stored in local cache '{}/{}'",
+ Hash,
+ Ref.BucketSegment,
+ Ref.HashKey);
+ Count.Invalid++;
+ }
+ }
}
- }
- Count.Total++;
- });
+ Count.Total++;
+ });
if ((Count.Valid == Count.Total) || PartialRecord)
{
@@ -701,6 +720,16 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
}
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+
BinaryWriter MemStream;
if (SkipData)
{
@@ -822,7 +851,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Compact binary validation failed"sv);
}
- CachePolicy Policy = PolicyFromUrl;
+ Body.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
int32_t TotalCount = 0;
@@ -846,11 +877,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
ValidAttachments.size(),
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- Body.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, {.Value = Body});
-
const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
+ CachePolicy Policy = PolicyFromUrl;
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
{
m_UpstreamCache.EnqueueUpstream({.Type = ZenContentType::kCbObject,
@@ -876,27 +905,23 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
}
CachePolicy Policy = PolicyFromUrl;
- CbObject CacheRecord = Package.GetObject();
- AttachmentCount Count;
- std::vector<IoHash> ValidAttachments;
+ CbObject CacheRecord = Package.GetObject();
- ValidAttachments.reserve(Package.GetAttachments().size());
+ AttachmentCount Count;
+ size_t NumAttachments = Package.GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
- CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) {
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count](CbFieldView HashView) {
const IoHash Hash = HashView.AsHash();
if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
-
+ AttachmentsToStoreLocally.emplace_back(Attachment);
ValidAttachments.emplace_back(Hash);
-
- if (InsertResult.New)
- {
- Count.New++;
- }
Count.Valid++;
}
else
@@ -923,6 +948,21 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
+ ZenCacheValue CacheValue;
+ CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ }
+
ZEN_DEBUG("PUTCACHERECORD - '{}/{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total) in {}",
Ref.Namespace,
Ref.BucketSegment,
@@ -934,12 +974,6 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
Count.Total,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- ZenCacheValue CacheValue;
- CacheValue.Value = CacheRecord.GetBuffer().AsIoBuffer();
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
-
- m_CacheStore.Put(Ref.Namespace, Ref.BucketSegment, Ref.HashKey, CacheValue);
-
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Policy, CachePolicy::StoreRemote) && !IsPartialRecord)
@@ -1233,56 +1267,71 @@ HttpStructuredCacheService::HandleRpcPutCacheRecords(zen::HttpServerRequest& Req
HttpStructuredCacheService::PutResult
HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPackage* Package)
{
- std::vector<IoHash> ValidAttachments;
- AttachmentCount Count;
- CbObjectView Record = Request.RecordObject;
- uint64_t RecordObjectSize = Record.GetSize();
- uint64_t TransferredSize = RecordObjectSize;
+ CbObjectView Record = Request.RecordObject;
+ uint64_t RecordObjectSize = Record.GetSize();
+ uint64_t TransferredSize = RecordObjectSize;
+
+ AttachmentCount Count;
+ size_t NumAttachments = Package->GetAttachments().size();
+ std::vector<IoHash> ValidAttachments;
+ std::vector<const CbAttachment*> AttachmentsToStoreLocally;
+ ValidAttachments.reserve(NumAttachments);
+ AttachmentsToStoreLocally.reserve(NumAttachments);
Stopwatch Timer;
- Request.RecordObject.IterateAttachments([this, &Request, Package, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
- const IoHash ValueHash = HashView.AsHash();
- if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
- {
- if (Attachment->IsCompressedBinary())
+ Request.RecordObject.IterateAttachments(
+ [this, &Request, Package, &AttachmentsToStoreLocally, &ValidAttachments, &Count, &TransferredSize](CbFieldView HashView) {
+ const IoHash ValueHash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package ? Package->FindAttachment(ValueHash) : nullptr)
{
- CompressedBuffer Chunk = Attachment->AsCompressedBinary();
- CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
-
- ValidAttachments.emplace_back(ValueHash);
-
- if (InsertResult.New)
+ if (Attachment->IsCompressedBinary())
{
- Count.New++;
+ AttachmentsToStoreLocally.emplace_back(Attachment);
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
+ }
+ else
+ {
+ ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
+ Request.Namespace,
+ Request.Key.Bucket,
+ Request.Key.Hash,
+ ToString(HttpContentType::kCbPackage),
+ ValueHash);
+ Count.Invalid++;
}
- Count.Valid++;
- TransferredSize += Chunk.GetCompressedSize();
}
- else
+ else if (m_CidStore.ContainsChunk(ValueHash))
{
- ZEN_WARN("PUTCACEHRECORD - '{}/{}/{}' '{}' FAILED, attachment '{}' is not compressed",
- Request.Namespace,
- Request.Key.Bucket,
- Request.Key.Hash,
- ToString(HttpContentType::kCbPackage),
- ValueHash);
- Count.Invalid++;
+ ValidAttachments.emplace_back(ValueHash);
+ Count.Valid++;
}
- }
- else if (m_CidStore.ContainsChunk(ValueHash))
- {
- ValidAttachments.emplace_back(ValueHash);
- Count.Valid++;
- }
- Count.Total++;
- });
+ Count.Total++;
+ });
if (Count.Invalid > 0)
{
return PutResult::Invalid;
}
+ ZenCacheValue CacheValue;
+ CacheValue.Value = IoBuffer(Record.GetSize());
+ Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
+ CacheValue.Value.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
+
+ for (const CbAttachment* Attachment : AttachmentsToStoreLocally)
+ {
+ CompressedBuffer Chunk = Attachment->AsCompressedBinary();
+ CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Chunk);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ TransferredSize += Chunk.GetCompressedSize();
+ }
+
ZEN_DEBUG("PUTCACEHRECORD - '{}/{}/{}' {}, attachments '{}/{}/{}' (new/valid/total) in {}",
Request.Namespace,
Request.Key.Bucket,
@@ -1293,12 +1342,6 @@ HttpStructuredCacheService::PutCacheRecord(PutRequestData& Request, const CbPack
Count.Total,
NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000));
- ZenCacheValue CacheValue;
- CacheValue.Value = IoBuffer(Record.GetSize());
- Record.CopyTo(MutableMemoryView(CacheValue.Value.MutableData(), CacheValue.Value.GetSize()));
- CacheValue.Value.SetContentType(ZenContentType::kCbObject);
- m_CacheStore.Put(Request.Namespace, Request.Key.Bucket, Request.Key.Hash, CacheValue);
-
const bool IsPartialRecord = Count.Valid != Count.Total;
if (EnumHasAllFlags(Request.Policy.GetRecordPolicy(), CachePolicy::StoreRemote) && !IsPartialRecord)
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index 87118991e..c60d5b405 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -24,6 +24,8 @@
#include "config.h"
+#include <latch>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <xxh3.h>
ZEN_THIRD_PARTY_INCLUDES_END
@@ -626,7 +628,10 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
using namespace std::literals;
- // Persist attachments
+ const CbObject& Core = OpPackage.GetObject();
+ const uint32_t EntryId = AppendNewOplogEntry(Core);
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
uint64_t AttachmentBytes = 0;
uint64_t NewAttachmentBytes = 0;
@@ -648,9 +653,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
AttachmentBytes += AttachmentSize;
}
- const CbObject& Core = OpPackage.GetObject();
- const uint32_t EntryId = AppendNewOplogEntry(Core);
-
ZEN_DEBUG("oplog entry #{} attachments: {} new, {} total", EntryId, NiceBytes(NewAttachmentBytes), NiceBytes(AttachmentBytes));
return EntryId;
@@ -2243,24 +2245,6 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest);
}
- ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
-
- WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
- std::atomic_int64_t JobCount = 0;
- for (const CbAttachment& Attachment : Attachments)
- {
- JobCount.fetch_add(1);
- WorkerPool.ScheduleWork([this, &Attachment, &JobCount]() {
- CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
- m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
- JobCount.fetch_add(-1);
- });
- }
- while (JobCount.load())
- {
- Sleep(1);
- }
-
ZEN_INFO("Writing {} ops for '{}/{}'", Ops.size(), ProjectId, OplogId);
for (const CbObject& Op : Ops)
{
@@ -2279,6 +2263,35 @@ HttpProjectService::HttpProjectService(CidStore& Store, ProjectStore* Projects)
NiceBytes(Op.GetSize()),
Op["key"sv].AsString());
}
+
+ // Persist attachments after oplog entry so GC won't find attachments without references
+ ZEN_INFO("Writing {} attachments for '{}/{}'", Attachments.size(), ProjectId, OplogId);
+
+ // We are creating a worker thread pool here since we are storing a lot of attachments in one go
+ // Doing import is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
+ WorkerThreadPool WorkerPool(Min(std::thread::hardware_concurrency(), 16u));
+ Latch JobCount{gsl::narrow_cast<std::ptrdiff_t>(Attachments.size())};
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ WorkerPool.ScheduleWork([this, &Attachment, &JobCount, ProjectId, OplogId]() {
+ try
+ {
+ CompressedBuffer AttachmentBody = Attachment.AsCompressedBinary();
+ m_CidStore.AddChunk(AttachmentBody, CidStore::InsertMode::kCopyOnly);
+ }
+ catch (std::exception& e)
+ {
+ ZEN_ERROR("Failed to store attachment {} for '{}/{}', reason: '{}'",
+ Attachment.GetHash(),
+ ProjectId,
+ OplogId,
+ e.what());
+ }
+ JobCount.CountDown();
+ });
+ }
+ JobCount.Wait();
+
ZEN_INFO("Imported {} ops and {} attachments to '{}/{}'", Entries.Num(), Attachments.size(), ProjectId, OplogId);
return Req.ServerRequest().WriteResponse(HttpResponseCode::Created);
}