aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-02-02 09:56:22 -0800
committerGitHub <[email protected]>2023-02-02 18:56:22 +0100
commit231d69721625358cf7c31db2575f2cc9117f8482 (patch)
tree7e0deb0785a586d305cc8384ae6763484d99b558
parentUpdated README.md (diff)
downloadzen-231d69721625358cf7c31db2575f2cc9117f8482.tar.xz
zen-231d69721625358cf7c31db2575f2cc9117f8482.zip
Reduce lock scopes in oplog (#220)
-rw-r--r--zenserver/projectstore/projectstore.cpp190
-rw-r--r--zenserver/projectstore/projectstore.h23
2 files changed, 123 insertions, 90 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp
index 216a8cbf6..9a17443a8 100644
--- a/zenserver/projectstore/projectstore.cpp
+++ b/zenserver/projectstore/projectstore.cpp
@@ -220,27 +220,19 @@ struct ProjectStore::OplogStorage : public RefCounted
return CbObject(SharedBuffer(std::move(OpBuffer)));
}
- OplogEntry AppendOp(CbObject Op)
+ OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash)
{
ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp");
using namespace std::literals;
- SharedBuffer Buffer = Op.GetBuffer();
- const uint64_t WriteSize = Buffer.GetSize();
- const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+ uint64_t WriteSize = Buffer.GetSize();
- ZEN_ASSERT(WriteSize != 0);
-
- XXH3_128Stream KeyHasher;
- Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
- XXH3_128 KeyHash = KeyHasher.GetHash();
-
- RwLock::ExclusiveLockScope _(m_RwLock);
+ RwLock::ExclusiveLockScope Lock(m_RwLock);
const uint64_t WriteOffset = m_NextOpsOffset;
const uint32_t OpLsn = ++m_MaxLsn;
-
- m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+ m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+ Lock.ReleaseNow();
ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
@@ -251,7 +243,6 @@ struct ProjectStore::OplogStorage : public RefCounted
.OpKeyHash = KeyHash};
m_Oplog.Append(Entry);
-
m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
return Entry;
@@ -465,7 +456,8 @@ ProjectStore::Oplog::ReplayLog()
{
return;
}
- m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); });
+ m_Storage->ReplayLog(
+ [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); });
}
IoBuffer
@@ -594,18 +586,13 @@ ProjectStore::Oplog::GetOpByIndex(int Index)
return {};
}
-bool
+void
ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
Oid FileId,
IoHash Hash,
std::string_view ServerPath,
std::string_view ClientPath)
{
- if (ServerPath.empty() || ClientPath.empty())
- {
- return false;
- }
-
if (Hash != IoHash::Zero)
{
m_ChunkMap.insert_or_assign(FileId, Hash);
@@ -621,8 +608,6 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&,
{
m_ChunkMap.insert_or_assign(FileId, Hash);
}
-
- return true;
}
void
@@ -637,87 +622,100 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk
m_MetaMap.insert_or_assign(ChunkId, Hash);
}
-uint32_t
-ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
- CbObject Core,
- const OplogEntry& OpEntry,
- UpdateType TypeOfUpdate)
+ProjectStore::Oplog::OplogEntryMapping
+ProjectStore::Oplog::GetMapping(CbObject Core)
{
- ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry");
-
- ZEN_UNUSED(TypeOfUpdate);
-
- // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
- // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
-
using namespace std::literals;
+ OplogEntryMapping Result;
+
// Update chunk id maps
+ CbObjectView PackageObj = Core["package"sv].AsObjectView();
+ CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView();
+ Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num());
- if (Core["package"sv])
+ if (PackageObj)
{
- CbObjectView PkgObj = Core["package"sv].AsObjectView();
- Oid PackageId = PkgObj["id"sv].AsObjectId();
- IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment();
-
- AddChunkMapping(OplogLock, PackageId, PackageHash);
-
- ZEN_DEBUG("package data {} -> {}", PackageId, PackageHash);
+ Oid Id = PackageObj["id"sv].AsObjectId();
+ IoHash Hash = PackageObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("package data {} -> {}", Id, Hash);
}
- for (CbFieldView& Entry : Core["bulkdata"sv])
+ for (CbFieldView& Entry : BulkDataArray)
{
CbObjectView BulkObj = Entry.AsObjectView();
+ Oid Id = BulkObj["id"sv].AsObjectId();
+ IoHash Hash = BulkObj["data"sv].AsBinaryAttachment();
+ Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ ZEN_DEBUG("bulkdata {} -> {}", Id, Hash);
+ }
- Oid BulkDataId = BulkObj["id"sv].AsObjectId();
- IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment();
+ CbArrayView FilesArray = Core["files"sv].AsArrayView();
+ Result.Files.reserve(FilesArray.Num());
+ for (CbFieldView& Entry : FilesArray)
+ {
+ CbObjectView FileObj = Entry.AsObjectView();
- AddChunkMapping(OplogLock, BulkDataId, BulkDataHash);
+ std::string_view ServerPath = FileObj["serverpath"sv].AsString();
+ std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+ if (ServerPath.empty() || ClientPath.empty())
+ {
+ ZEN_WARN("invalid file");
+ continue;
+ }
- ZEN_DEBUG("bulkdata {} -> {}", BulkDataId, BulkDataHash);
+ Oid Id = FileObj["id"sv].AsObjectId();
+ IoHash Hash = FileObj["data"sv].AsBinaryAttachment();
+ Result.Files.emplace_back(
+ OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)});
+ ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath);
}
- if (Core["files"sv])
+ CbArrayView MetaArray = Core["meta"sv].AsArrayView();
+ Result.Meta.reserve(MetaArray.Num());
+ for (CbFieldView& Entry : MetaArray)
{
- Stopwatch Timer;
- int32_t FileCount = 0;
- int32_t ChunkCount = 0;
+ CbObjectView MetaObj = Entry.AsObjectView();
+ Oid Id = MetaObj["id"sv].AsObjectId();
+ IoHash Hash = MetaObj["data"sv].AsBinaryAttachment();
+ Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash});
+ auto NameString = MetaObj["name"sv].AsString();
+ ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash);
+ }
- for (CbFieldView& Entry : Core["files"sv])
- {
- CbObjectView FileObj = Entry.AsObjectView();
- const Oid FileId = FileObj["id"sv].AsObjectId();
- IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment();
- std::string_view ServerPath = FileObj["serverpath"sv].AsString();
- std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+ return Result;
+}
- if (AddFileMapping(OplogLock, FileId, FileDataHash, ServerPath, ClientPath))
- {
- ++FileCount;
- }
- else
- {
- ZEN_WARN("invalid file");
- }
- }
+uint32_t
+ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
+ const OplogEntryMapping& OpMapping,
+ const OplogEntry& OpEntry,
+ UpdateType TypeOfUpdate)
+{
+ ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry");
- ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}",
- FileCount + ChunkCount,
- FileCount,
- ChunkCount,
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
- }
+ ZEN_UNUSED(TypeOfUpdate);
- for (CbFieldView& Entry : Core["meta"sv])
+ // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
+ // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
+
+ using namespace std::literals;
+
+ // Update chunk id maps
+ for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks)
{
- CbObjectView MetaObj = Entry.AsObjectView();
- const Oid MetaId = MetaObj["id"sv].AsObjectId();
- auto NameString = MetaObj["name"sv].AsString();
- IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment();
+ AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash);
+ }
- AddMetaMapping(OplogLock, MetaId, MetaDataHash);
+ for (const OplogEntryMapping::FileMapping& File : OpMapping.Files)
+ {
+ AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath);
+ }
- ZEN_DEBUG("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash);
+ for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta)
+ {
+ AddMetaMapping(OplogLock, Meta.Id, Meta.Hash);
}
m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
@@ -731,8 +729,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
{
ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- using namespace std::literals;
-
const CbObject& Core = OpPackage.GetObject();
const uint32_t EntryId = AppendNewOplogEntry(Core);
if (EntryId == 0xffffffffu)
@@ -773,16 +769,34 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core)
{
ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry");
- RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ using namespace std::literals;
+
+ OplogEntryMapping Mapping = GetMapping(Core);
+
+ SharedBuffer Buffer = Core.GetBuffer();
+ const uint64_t WriteSize = Buffer.GetSize();
+ const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ ZEN_ASSERT(WriteSize != 0);
+
+ XXH3_128Stream KeyHasher;
+ Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ RefPtr<OplogStorage> Storage;
+
+ {
+ RwLock::SharedLockScope _(m_OplogLock);
+ Storage = m_Storage;
+ }
if (!m_Storage)
{
return 0xffffffffu;
}
+ const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash);
- using namespace std::literals;
-
- const OplogEntry OpEntry = m_Storage->AppendOp(Core);
- const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry);
+ RwLock::ExclusiveLockScope OplogLock(m_OplogLock);
+ const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry);
return EntryId;
}
diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h
index 8267bd9e0..6b214d5a2 100644
--- a/zenserver/projectstore/projectstore.h
+++ b/zenserver/projectstore/projectstore.h
@@ -156,6 +156,25 @@ public:
*/
void ReplayLog();
+ struct OplogEntryMapping
+ {
+ struct Mapping
+ {
+ Oid Id;
+ IoHash Hash;
+ };
+ struct FileMapping : public Mapping
+ {
+ std::string ServerPath;
+ std::string ClientPath;
+ };
+ std::vector<Mapping> Chunks;
+ std::vector<Mapping> Meta;
+ std::vector<FileMapping> Files;
+ };
+
+ OplogEntryMapping GetMapping(CbObject Core);
+
/** Update tracking metadata for a new oplog entry
*
* This is used during replay (and gets called as part of new op append)
@@ -163,11 +182,11 @@ public:
* Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected
*/
uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock,
- CbObject Core,
+ const OplogEntryMapping& OpMapping,
const OplogEntry& OpEntry,
UpdateType TypeOfUpdate);
- bool AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock,
+ void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock,
Oid FileId,
IoHash Hash,
std::string_view ServerPath,