aboutsummaryrefslogtreecommitdiff
path: root/zenserver/projectstore.cpp
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-06-23 14:34:28 +0200
committerGitHub <[email protected]>2021-06-23 14:34:28 +0200
commitaf7ff3f1c66628c8005fb7d5d8e86e985d2d7b05 (patch)
tree5f9b72f6a72011639a81cf8ef32c92981504fd81 /zenserver/projectstore.cpp
parentMade some changes to how mesh config works (diff)
downloadzen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.tar.xz
zen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.zip
Support iterative cooks (#3)
* Added new route to get all chunk IDs and chunk hashes. Changed to always update chunk mapping to support iterative cooks. * Replay latest oplog entries. * Include server path when fetching file(s) and support for fetching single oplog entry. * Removed get chunks route. * Removed iterate chunk map. * Take read lock when iterating oplog. * Take read lock when reading oplog entry. * Take ownership of buffer reading oplog entry. * Fixed incorrect oplog key when fetching single entry. * Changed map updates to use insert_or_assign for efficiency Co-authored-by: Stefan Boberg <[email protected]>
Diffstat (limited to 'zenserver/projectstore.cpp')
-rw-r--r--zenserver/projectstore.cpp154
1 files changed, 144 insertions, 10 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index b331f262a..e745972d3 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -40,6 +40,24 @@ namespace rocksdb = ROCKSDB_NAMESPACE;
//////////////////////////////////////////////////////////////////////////
+Oid
+OpKeyStringAsOId(std::string_view OpKey)
+{
+ CbObjectWriter Writer;
+ Writer << "key" << OpKey;
+
+ XXH3_128Stream KeyHasher;
+ Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ Oid OpId;
+ memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits));
+
+ return OpId;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
struct ProjectStore::OplogStorage : public RefCounted
{
OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
@@ -180,6 +198,25 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset);
}
+ void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ {
+ for (const OplogEntryAddress& Entry : Entries)
+ {
+ CbObject Op = GetOp(Entry);
+ Handler(Op);
+ }
+ }
+
+ CbObject GetOp(const OplogEntryAddress& Entry)
+ {
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ return CbObject(SharedBuffer(std::move(OpBuffer)));
+ }
+
OplogEntry AppendOp(CbObject Op)
{
SharedBuffer Buffer = Op.GetBuffer();
@@ -298,14 +335,54 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId)
}
void
-ProjectStore::Oplog::IterateFileMap(std::function<void(const Oid&, const std::string_view&)>&& Fn)
+ProjectStore::Oplog::IterateFileMap(
+ std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
{
RwLock::SharedLockScope _(m_OplogLock);
for (const auto& Kv : m_FileMap)
{
- Fn(Kv.first, Kv.second.ClientPath);
+ Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ std::vector<OplogEntryAddress> Entries;
+ Entries.reserve(m_LatestOpMap.size());
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
}
+
+ std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); });
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOplog(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ {
+ const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ return m_Storage->GetOp(AddressEntry->second);
+ }
+
+ return {};
}
bool
@@ -318,20 +395,20 @@ ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view Se
return false;
}
- if (ServerPath[0] == '/')
+ if (ServerPath[0] == '/' || ClientPath[0] != '/')
{
- ServerPath = ServerPath.substr(1);
+ return false;
}
FileMapEntry Entry;
Entry.ServerPath = ServerPath;
Entry.ClientPath = ClientPath;
- m_FileMap.emplace(FileId, std::move(Entry));
+ m_FileMap[FileId] = std::move(Entry);
if (Hash != IoHash::Zero)
{
- m_ChunkMap.emplace(FileId, Hash);
+ m_ChunkMap.insert_or_assign(FileId, Hash);
}
return true;
@@ -342,7 +419,7 @@ ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash)
{
// NOTE: Caller must hold an exclusive lock on m_OplogLock
- m_ChunkMap.emplace(ChunkId, Hash);
+ m_ChunkMap.insert_or_assign(ChunkId, Hash);
}
void
@@ -350,7 +427,7 @@ ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash)
{
// NOTE: Caller must hold an exclusive lock on m_OplogLock
- m_MetaMap.emplace(ChunkId, Hash);
+ m_MetaMap.insert_or_assign(ChunkId, Hash);
}
uint32_t
@@ -915,13 +992,21 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
ProjectStore::Oplog& Log = *FoundLog;
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ const bool FilterClient = Params.GetValue("filter") == "client";
+
CbObjectWriter Response;
Response.BeginArray("files");
- Log.IterateFileMap([&](const Oid& Id, const std::string_view& Path) {
+ Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
Response.BeginObject();
Response << "id" << Id;
- Response << "path" << Path;
+ Response << "clientpath" << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath" << ServerPath;
+ }
Response.EndObject();
});
@@ -1302,6 +1387,55 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/entries",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ CbObjectWriter Response;
+
+ if (FoundLog->OplogCount() > 0)
+ {
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
+ {
+ Oid OpKeyId = OpKeyStringAsOId(OpKey);
+ std::optional<CbObject> Op = FoundLog->GetOplog(OpKeyId);
+
+ if (Op.has_value())
+ {
+ Response << "entry" << Op.value();
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ }
+ else
+ {
+ Response.BeginArray("entries"sv);
+
+ FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; });
+
+ Response.EndArray();
+ }
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"{project}",
[this](HttpRouterRequest& Req) {
const std::string ProjectId = Req.GetCapture(1);