diff options
| author | Per Larsson <[email protected]> | 2021-06-23 14:34:28 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2021-06-23 14:34:28 +0200 |
| commit | af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05 (patch) | |
| tree | 5f9b72f6a72011639a81cf8ef32c92981504fd81 /zenserver/projectstore.cpp | |
| parent | Made some changes to how mesh config works (diff) | |
| download | zen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.tar.xz zen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.zip | |
Support iterative cooks (#3)
* Added new route to get all chunk IDs and chunk hashes. Changed to always update chunk mapping to support iterative cooks.
* Replay latest oplog entries.
* Include server path when fetching file(s) and support for fetching single oplog entry.
* Removed get chunks route.
* Removed iterate chunk map.
* Take read lock when iterating oplog.
* Take read lock when reading oplog entry.
* Take ownership of buffer reading oplog entry.
* Fixed incorrect oplog key when fetching single entry.
* Changed map updates to use insert_or_assign for efficiency
Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver/projectstore.cpp')
| -rw-r--r-- | zenserver/projectstore.cpp | 154 |
1 files changed, 144 insertions, 10 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index b331f262a..e745972d3 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -40,6 +40,24 @@ namespace rocksdb = ROCKSDB_NAMESPACE; ////////////////////////////////////////////////////////////////////////// +Oid +OpKeyStringAsOId(std::string_view OpKey) +{ + CbObjectWriter Writer; + Writer << "key" << OpKey; + + XXH3_128Stream KeyHasher; + Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); }); + XXH3_128 KeyHash = KeyHasher.GetHash(); + + Oid OpId; + memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits)); + + return OpId; +} + +////////////////////////////////////////////////////////////////////////// + struct ProjectStore::OplogStorage : public RefCounted { OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath) @@ -180,6 +198,25 @@ struct ProjectStore::OplogStorage : public RefCounted m_NextOpsOffset); } + void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler) + { + for (const OplogEntryAddress& Entry : Entries) + { + CbObject Op = GetOp(Entry); + Handler(Op); + } + } + + CbObject GetOp(const OplogEntryAddress& Entry) + { + IoBuffer OpBuffer(Entry.Size); + + const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign; + m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset); + + return CbObject(SharedBuffer(std::move(OpBuffer))); + } + OplogEntry AppendOp(CbObject Op) { SharedBuffer Buffer = Op.GetBuffer(); @@ -298,14 +335,54 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) } void -ProjectStore::Oplog::IterateFileMap(std::function<void(const Oid&, const std::string_view&)>&& Fn) +ProjectStore::Oplog::IterateFileMap( + std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) { RwLock::SharedLockScope _(m_OplogLock); for (const auto& Kv : m_FileMap) { - Fn(Kv.first, Kv.second.ClientPath); + Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath); + } +} + +void +ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler) +{ + RwLock::SharedLockScope _(m_OplogLock); + + std::vector<OplogEntryAddress> Entries; + Entries.reserve(m_LatestOpMap.size()); + + for (const auto& Kv : m_LatestOpMap) + { + const auto AddressEntry = m_OpAddressMap.find(Kv.second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + Entries.push_back(AddressEntry->second); } + + std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) { + return Lhs.Offset < Rhs.Offset; + }); + + m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); }); +} + +std::optional<CbObject> +ProjectStore::Oplog::GetOplog(const Oid& Key) +{ + RwLock::SharedLockScope _(m_OplogLock); + + if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end()) + { + const auto AddressEntry = m_OpAddressMap.find(LatestOp->second); + ZEN_ASSERT(AddressEntry != m_OpAddressMap.end()); + + return m_Storage->GetOp(AddressEntry->second); + } + + return {}; } bool @@ -318,20 +395,20 @@ ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view Se return false; } - if (ServerPath[0] == '/') + if (ServerPath[0] == '/' || ClientPath[0] != '/') { - ServerPath = ServerPath.substr(1); + return false; } FileMapEntry Entry; Entry.ServerPath = ServerPath; Entry.ClientPath = ClientPath; - m_FileMap.emplace(FileId, std::move(Entry)); + m_FileMap[FileId] = std::move(Entry); if (Hash != IoHash::Zero) { - m_ChunkMap.emplace(FileId, Hash); + m_ChunkMap.insert_or_assign(FileId, Hash); } return true; @@ -342,7 +419,7 @@ ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash) { // NOTE: Caller must hold an exclusive lock on m_OplogLock - m_ChunkMap.emplace(ChunkId, Hash); + m_ChunkMap.insert_or_assign(ChunkId, Hash); } void @@ -350,7 +427,7 @@ ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash) { // NOTE: Caller must hold an exclusive lock on m_OplogLock - m_MetaMap.emplace(ChunkId, Hash); + m_MetaMap.insert_or_assign(ChunkId, Hash); } uint32_t @@ -915,13 +992,21 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) ProjectStore::Oplog& Log = *FoundLog; + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + const bool FilterClient = Params.GetValue("filter") == "client"; + CbObjectWriter Response; Response.BeginArray("files"); - Log.IterateFileMap([&](const Oid& Id, const std::string_view& Path) { + Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) { Response.BeginObject(); Response << "id" << Id; - Response << "path" << Path; + Response << "clientpath" << ClientPath; + if (!FilterClient) + { + Response << "serverpath" << ServerPath; + } Response.EndObject(); }); @@ -1302,6 +1387,55 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( + "{project}/oplog/{log}/entries", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const auto& ProjectId = Req.GetCapture(1); + const auto& OplogId = Req.GetCapture(2); + + ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId); + + if (FoundLog == nullptr) + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + + CbObjectWriter Response; + + if (FoundLog->OplogCount() > 0) + { + HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams(); + + if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty()) + { + Oid OpKeyId = OpKeyStringAsOId(OpKey); + std::optional<CbObject> Op = FoundLog->GetOplog(OpKeyId); + + if (Op.has_value()) + { + Response << "entry" << Op.value(); + } + else + { + return HttpReq.WriteResponse(HttpResponse::NotFound); + } + } + else + { + Response.BeginArray("entries"sv); + + FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; }); + + Response.EndArray(); + } + } + + return HttpReq.WriteResponse(HttpResponse::OK, Response.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "{project}", [this](HttpRouterRequest& Req) { const std::string ProjectId = Req.GetCapture(1); |