diff options
| author | Dan Engelbrecht <[email protected]> | 2023-02-02 09:56:22 -0800 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-02-02 18:56:22 +0100 |
| commit | 231d69721625358cf7c31db2575f2cc9117f8482 (patch) | |
| tree | 7e0deb0785a586d305cc8384ae6763484d99b558 | |
| parent | Updated README.md (diff) | |
| download | zen-231d69721625358cf7c31db2575f2cc9117f8482.tar.xz zen-231d69721625358cf7c31db2575f2cc9117f8482.zip | |
Reduce lock scopes in oplog (#220)
| -rw-r--r-- | zenserver/projectstore/projectstore.cpp | 190 | ||||
| -rw-r--r-- | zenserver/projectstore/projectstore.h | 23 |
2 files changed, 123 insertions, 90 deletions
diff --git a/zenserver/projectstore/projectstore.cpp b/zenserver/projectstore/projectstore.cpp index 216a8cbf6..9a17443a8 100644 --- a/zenserver/projectstore/projectstore.cpp +++ b/zenserver/projectstore/projectstore.cpp @@ -220,27 +220,19 @@ struct ProjectStore::OplogStorage : public RefCounted return CbObject(SharedBuffer(std::move(OpBuffer))); } - OplogEntry AppendOp(CbObject Op) + OplogEntry AppendOp(SharedBuffer Buffer, uint32_t OpCoreHash, XXH3_128 KeyHash) { ZEN_TRACE_CPU("ProjectStore::OplogStorage::AppendOp"); using namespace std::literals; - SharedBuffer Buffer = Op.GetBuffer(); - const uint64_t WriteSize = Buffer.GetSize(); - const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + uint64_t WriteSize = Buffer.GetSize(); - ZEN_ASSERT(WriteSize != 0); - - XXH3_128Stream KeyHasher; - Op["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); - XXH3_128 KeyHash = KeyHasher.GetHash(); - - RwLock::ExclusiveLockScope _(m_RwLock); + RwLock::ExclusiveLockScope Lock(m_RwLock); const uint64_t WriteOffset = m_NextOpsOffset; const uint32_t OpLsn = ++m_MaxLsn; - - m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); + m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign); + Lock.ReleaseNow(); ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign)); @@ -251,7 +243,6 @@ struct ProjectStore::OplogStorage : public RefCounted .OpKeyHash = KeyHash}; m_Oplog.Append(Entry); - m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset); return Entry; @@ -465,7 +456,8 @@ ProjectStore::Oplog::ReplayLog() { return; } - m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, Op, OpEntry, kUpdateReplay); }); + m_Storage->ReplayLog( + [&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(OplogLock, GetMapping(Op), OpEntry, kUpdateReplay); }); } IoBuffer @@ -594,18 +586,13 @@ ProjectStore::Oplog::GetOpByIndex(int Index) return {}; } -bool +void ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath) { - if (ServerPath.empty() || ClientPath.empty()) - { - return false; - } - if (Hash != IoHash::Zero) { m_ChunkMap.insert_or_assign(FileId, Hash); @@ -621,8 +608,6 @@ ProjectStore::Oplog::AddFileMapping(const RwLock::ExclusiveLockScope&, { m_ChunkMap.insert_or_assign(FileId, Hash); } - - return true; } void @@ -637,87 +622,100 @@ ProjectStore::Oplog::AddMetaMapping(const RwLock::ExclusiveLockScope&, Oid Chunk m_MetaMap.insert_or_assign(ChunkId, Hash); } -uint32_t -ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - CbObject Core, - const OplogEntry& OpEntry, - UpdateType TypeOfUpdate) +ProjectStore::Oplog::OplogEntryMapping +ProjectStore::Oplog::GetMapping(CbObject Core) { - ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); - - ZEN_UNUSED(TypeOfUpdate); - - // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing - // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however - using namespace std::literals; + OplogEntryMapping Result; + // Update chunk id maps + CbObjectView PackageObj = Core["package"sv].AsObjectView(); + CbArrayView BulkDataArray = Core["bulkdata"sv].AsArrayView(); + Result.Chunks.reserve(PackageObj ? 1 : 0 + BulkDataArray.Num()); - if (Core["package"sv]) + if (PackageObj) { - CbObjectView PkgObj = Core["package"sv].AsObjectView(); - Oid PackageId = PkgObj["id"sv].AsObjectId(); - IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment(); - - AddChunkMapping(OplogLock, PackageId, PackageHash); - - ZEN_DEBUG("package data {} -> {}", PackageId, PackageHash); + Oid Id = PackageObj["id"sv].AsObjectId(); + IoHash Hash = PackageObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("package data {} -> {}", Id, Hash); } - for (CbFieldView& Entry : Core["bulkdata"sv]) + for (CbFieldView& Entry : BulkDataArray) { CbObjectView BulkObj = Entry.AsObjectView(); + Oid Id = BulkObj["id"sv].AsObjectId(); + IoHash Hash = BulkObj["data"sv].AsBinaryAttachment(); + Result.Chunks.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + ZEN_DEBUG("bulkdata {} -> {}", Id, Hash); + } - Oid BulkDataId = BulkObj["id"sv].AsObjectId(); - IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment(); + CbArrayView FilesArray = Core["files"sv].AsArrayView(); + Result.Files.reserve(FilesArray.Num()); + for (CbFieldView& Entry : FilesArray) + { + CbObjectView FileObj = Entry.AsObjectView(); - AddChunkMapping(OplogLock, BulkDataId, BulkDataHash); + std::string_view ServerPath = FileObj["serverpath"sv].AsString(); + std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + if (ServerPath.empty() || ClientPath.empty()) + { + ZEN_WARN("invalid file"); + continue; + } - ZEN_DEBUG("bulkdata {} -> {}", BulkDataId, BulkDataHash); + Oid Id = FileObj["id"sv].AsObjectId(); + IoHash Hash = FileObj["data"sv].AsBinaryAttachment(); + Result.Files.emplace_back( + OplogEntryMapping::FileMapping{OplogEntryMapping::Mapping{Id, Hash}, std::string(ServerPath), std::string(ClientPath)}); + ZEN_DEBUG("file {} -> {}, ServerPath: {}, ClientPath: {}", Id, Hash, ServerPath, ClientPath); } - if (Core["files"sv]) + CbArrayView MetaArray = Core["meta"sv].AsArrayView(); + Result.Meta.reserve(MetaArray.Num()); + for (CbFieldView& Entry : MetaArray) { - Stopwatch Timer; - int32_t FileCount = 0; - int32_t ChunkCount = 0; + CbObjectView MetaObj = Entry.AsObjectView(); + Oid Id = MetaObj["id"sv].AsObjectId(); + IoHash Hash = MetaObj["data"sv].AsBinaryAttachment(); + Result.Meta.emplace_back(OplogEntryMapping::Mapping{Id, Hash}); + auto NameString = MetaObj["name"sv].AsString(); + ZEN_DEBUG("meta data ({}) {} -> {}", NameString, Id, Hash); + } - for (CbFieldView& Entry : Core["files"sv]) - { - CbObjectView FileObj = Entry.AsObjectView(); - const Oid FileId = FileObj["id"sv].AsObjectId(); - IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment(); - std::string_view ServerPath = FileObj["serverpath"sv].AsString(); - std::string_view ClientPath = FileObj["clientpath"sv].AsString(); + return Result; +} - if (AddFileMapping(OplogLock, FileId, FileDataHash, ServerPath, ClientPath)) - { - ++FileCount; - } - else - { - ZEN_WARN("invalid file"); - } - } +uint32_t +ProjectStore::Oplog::RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, + const OplogEntryMapping& OpMapping, + const OplogEntry& OpEntry, + UpdateType TypeOfUpdate) +{ + ZEN_TRACE_CPU("ProjectStore::Oplog::RegisterOplogEntry"); - ZEN_DEBUG("added {} file(s), {} as files and {} as chunks in {}", - FileCount + ChunkCount, - FileCount, - ChunkCount, - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); - } + ZEN_UNUSED(TypeOfUpdate); - for (CbFieldView& Entry : Core["meta"sv]) + // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing + // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however + + using namespace std::literals; + + // Update chunk id maps + for (const OplogEntryMapping::Mapping& Chunk : OpMapping.Chunks) { - CbObjectView MetaObj = Entry.AsObjectView(); - const Oid MetaId = MetaObj["id"sv].AsObjectId(); - auto NameString = MetaObj["name"sv].AsString(); - IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment(); + AddChunkMapping(OplogLock, Chunk.Id, Chunk.Hash); + } - AddMetaMapping(OplogLock, MetaId, MetaDataHash); + for (const OplogEntryMapping::FileMapping& File : OpMapping.Files) + { + AddFileMapping(OplogLock, File.Id, File.Hash, File.ServerPath, File.ClientPath); + } - ZEN_DEBUG("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash); + for (const OplogEntryMapping::Mapping& Meta : OpMapping.Meta) + { + AddMetaMapping(OplogLock, Meta.Id, Meta.Hash); } m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize}); @@ -731,8 +729,6 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - using namespace std::literals; - const CbObject& Core = OpPackage.GetObject(); const uint32_t EntryId = AppendNewOplogEntry(Core); if (EntryId == 0xffffffffu) @@ -773,16 +769,34 @@ ProjectStore::Oplog::AppendNewOplogEntry(CbObject Core) { ZEN_TRACE_CPU("ProjectStore::Oplog::AppendNewOplogEntry"); - RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + using namespace std::literals; + + OplogEntryMapping Mapping = GetMapping(Core); + + SharedBuffer Buffer = Core.GetBuffer(); + const uint64_t WriteSize = Buffer.GetSize(); + const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF); + + ZEN_ASSERT(WriteSize != 0); + + XXH3_128Stream KeyHasher; + Core["key"sv].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + RefPtr<OplogStorage> Storage; + + { + RwLock::SharedLockScope _(m_OplogLock); + Storage = m_Storage; + } if (!m_Storage) { return 0xffffffffu; } + const OplogEntry OpEntry = m_Storage->AppendOp(Buffer, OpCoreHash, KeyHash); - using namespace std::literals; - - const OplogEntry OpEntry = m_Storage->AppendOp(Core); - const uint32_t EntryId = RegisterOplogEntry(OplogLock, Core, OpEntry, kUpdateNewEntry); + RwLock::ExclusiveLockScope OplogLock(m_OplogLock); + const uint32_t EntryId = RegisterOplogEntry(OplogLock, Mapping, OpEntry, kUpdateNewEntry); return EntryId; } diff --git a/zenserver/projectstore/projectstore.h b/zenserver/projectstore/projectstore.h index 8267bd9e0..6b214d5a2 100644 --- a/zenserver/projectstore/projectstore.h +++ b/zenserver/projectstore/projectstore.h @@ -156,6 +156,25 @@ public: */ void ReplayLog(); + struct OplogEntryMapping + { + struct Mapping + { + Oid Id; + IoHash Hash; + }; + struct FileMapping : public Mapping + { + std::string ServerPath; + std::string ClientPath; + }; + std::vector<Mapping> Chunks; + std::vector<Mapping> Meta; + std::vector<FileMapping> Files; + }; + + OplogEntryMapping GetMapping(CbObject Core); + /** Update tracking metadata for a new oplog entry * * This is used during replay (and gets called as part of new op append) @@ -163,11 +182,11 @@ public: * Returns the oplog LSN assigned to the new entry, or kInvalidOp if the entry is rejected */ uint32_t RegisterOplogEntry(RwLock::ExclusiveLockScope& OplogLock, - CbObject Core, + const OplogEntryMapping& OpMapping, const OplogEntry& OpEntry, UpdateType TypeOfUpdate); - bool AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, + void AddFileMapping(const RwLock::ExclusiveLockScope& OplogLock, Oid FileId, IoHash Hash, std::string_view ServerPath, |