aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPer Larsson <[email protected]>2021-06-23 14:34:28 +0200
committerGitHub <[email protected]>2021-06-23 14:34:28 +0200
commitaf7ff3f1c66628c8005fb7d5d8e86e985d2d7b05 (patch)
tree5f9b72f6a72011639a81cf8ef32c92981504fd81
parentMade some changes to how mesh config works (diff)
downloadzen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.tar.xz
zen-af7ff3f1c66628c8005fb7d5d8e86e985d2d7b05.zip
Support iterative cooks (#3)
* Added new route to get all chunk IDs and chunk hashes. Changed to always update chunk mapping to support iterative cooks. * Replay latest oplog entries. * Include server path when fetching file(s) and support for fetching single oplog entry. * Removed get chunks route. * Removed iterate chunk map. * Take read lock when iterating oplog. * Take read lock when reading oplog entry. * Take ownership of buffer reading oplog entry. * Fixed incorrect oplog key when fetching single entry. * Changed map updates to use insert_or_assign for efficiency Co-authored-by: Stefan Boberg <[email protected]>
-rw-r--r--zenserver/projectstore.cpp154
-rw-r--r--zenserver/projectstore.cpp-bdcc95661842
-rw-r--r--zenserver/projectstore.h19
3 files changed, 1998 insertions, 17 deletions
diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp
index b331f262a..e745972d3 100644
--- a/zenserver/projectstore.cpp
+++ b/zenserver/projectstore.cpp
@@ -40,6 +40,24 @@ namespace rocksdb = ROCKSDB_NAMESPACE;
//////////////////////////////////////////////////////////////////////////
+Oid
+OpKeyStringAsOId(std::string_view OpKey)
+{
+ CbObjectWriter Writer;
+ Writer << "key" << OpKey;
+
+ XXH3_128Stream KeyHasher;
+ Writer.Save()["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ Oid OpId;
+ memcpy(OpId.OidBits, &KeyHash, sizeof(OpId.OidBits));
+
+ return OpId;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
struct ProjectStore::OplogStorage : public RefCounted
{
OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
@@ -180,6 +198,25 @@ struct ProjectStore::OplogStorage : public RefCounted
m_NextOpsOffset);
}
+ void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ {
+ for (const OplogEntryAddress& Entry : Entries)
+ {
+ CbObject Op = GetOp(Entry);
+ Handler(Op);
+ }
+ }
+
+ CbObject GetOp(const OplogEntryAddress& Entry)
+ {
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ return CbObject(SharedBuffer(std::move(OpBuffer)));
+ }
+
OplogEntry AppendOp(CbObject Op)
{
SharedBuffer Buffer = Op.GetBuffer();
@@ -298,14 +335,54 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId)
}
void
-ProjectStore::Oplog::IterateFileMap(std::function<void(const Oid&, const std::string_view&)>&& Fn)
+ProjectStore::Oplog::IterateFileMap(
+ std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
{
RwLock::SharedLockScope _(m_OplogLock);
for (const auto& Kv : m_FileMap)
{
- Fn(Kv.first, Kv.second.ClientPath);
+ Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ std::vector<OplogEntryAddress> Entries;
+ Entries.reserve(m_LatestOpMap.size());
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
}
+
+ std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); });
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOplog(const Oid& Key)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ {
+ const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ return m_Storage->GetOp(AddressEntry->second);
+ }
+
+ return {};
}
bool
@@ -318,20 +395,20 @@ ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view Se
return false;
}
- if (ServerPath[0] == '/')
+ if (ServerPath[0] == '/' || ClientPath[0] != '/')
{
- ServerPath = ServerPath.substr(1);
+ return false;
}
FileMapEntry Entry;
Entry.ServerPath = ServerPath;
Entry.ClientPath = ClientPath;
- m_FileMap.emplace(FileId, std::move(Entry));
+ m_FileMap[FileId] = std::move(Entry);
if (Hash != IoHash::Zero)
{
- m_ChunkMap.emplace(FileId, Hash);
+ m_ChunkMap.insert_or_assign(FileId, Hash);
}
return true;
@@ -342,7 +419,7 @@ ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash)
{
// NOTE: Caller must hold an exclusive lock on m_OplogLock
- m_ChunkMap.emplace(ChunkId, Hash);
+ m_ChunkMap.insert_or_assign(ChunkId, Hash);
}
void
@@ -350,7 +427,7 @@ ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash)
{
// NOTE: Caller must hold an exclusive lock on m_OplogLock
- m_MetaMap.emplace(ChunkId, Hash);
+ m_MetaMap.insert_or_assign(ChunkId, Hash);
}
uint32_t
@@ -915,13 +992,21 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
ProjectStore::Oplog& Log = *FoundLog;
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ const bool FilterClient = Params.GetValue("filter") == "client";
+
CbObjectWriter Response;
Response.BeginArray("files");
- Log.IterateFileMap([&](const Oid& Id, const std::string_view& Path) {
+ Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
Response.BeginObject();
Response << "id" << Id;
- Response << "path" << Path;
+ Response << "clientpath" << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath" << ServerPath;
+ }
Response.EndObject();
});
@@ -1302,6 +1387,55 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete);
m_Router.RegisterRoute(
+ "{project}/oplog/{log}/entries",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ CbObjectWriter Response;
+
+ if (FoundLog->OplogCount() > 0)
+ {
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
+ {
+ Oid OpKeyId = OpKeyStringAsOId(OpKey);
+ std::optional<CbObject> Op = FoundLog->GetOplog(OpKeyId);
+
+ if (Op.has_value())
+ {
+ Response << "entry" << Op.value();
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+ }
+ else
+ {
+ Response.BeginArray("entries"sv);
+
+ FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; });
+
+ Response.EndArray();
+ }
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"{project}",
[this](HttpRouterRequest& Req) {
const std::string ProjectId = Req.GetCapture(1);
diff --git a/zenserver/projectstore.cpp-bdcc9566 b/zenserver/projectstore.cpp-bdcc9566
new file mode 100644
index 000000000..0700fc35d
--- /dev/null
+++ b/zenserver/projectstore.cpp-bdcc9566
@@ -0,0 +1,1842 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "projectstore.h"
+
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/filesystem.h>
+#include <zencore/fmtutils.h>
+#include <zencore/stream.h>
+#include <zencore/string.h>
+#include <zencore/timer.h>
+#include <zencore/windows.h>
+#include <zenstore/basicfile.h>
+#include <zenstore/cas.h>
+#include <zenstore/caslog.h>
+
+#define USE_ROCKSDB 0
+
+#if USE_ROCKSDB
+# pragma comment(lib, "Rpcrt4.lib") // RocksDB made me do this
+# include <rocksdb/db.h>
+#endif
+
+#include <ppl.h>
+#include <spdlog/spdlog.h>
+#include <xxh3.h>
+#include <asio.hpp>
+#include <future>
+#include <latch>
+#include <string>
+
+namespace zen {
+
+using namespace fmt::literals;
+
+#if USE_ROCKSDB
+namespace rocksdb = ROCKSDB_NAMESPACE;
+#endif
+
+//////////////////////////////////////////////////////////////////////////
+
+struct ProjectStore::OplogStorage : public RefCounted
+{
+ OplogStorage(ProjectStore::Oplog* OwnerOplog, std::filesystem::path BasePath) : m_OwnerOplog(OwnerOplog), m_OplogStoragePath(BasePath)
+ {
+ }
+
+ ~OplogStorage()
+ {
+ Log().info("closing oplog storage at {}", m_OplogStoragePath);
+ Flush();
+
+#if USE_ROCKSDB
+ if (m_RocksDb)
+ {
+ // Column families must be torn down before database is closed
+ for (const auto& Handle : m_RocksDbColumnHandles)
+ {
+ m_RocksDb->DestroyColumnFamilyHandle(Handle);
+ }
+
+ rocksdb::Status Status = m_RocksDb->Close();
+
+ if (!Status.ok())
+ {
+ Log().warn("db close error reported for '{}' : '{}'", m_OplogStoragePath, Status.getState());
+ }
+ }
+#endif
+ }
+
+ [[nodiscard]] bool Exists() { return Exists(m_OplogStoragePath); }
+ [[nodiscard]] static bool Exists(std::filesystem::path BasePath)
+ {
+ return std::filesystem::exists(BasePath / "ops.zlog") && std::filesystem::exists(BasePath / "ops.zops");
+ }
+
+ static bool Delete(std::filesystem::path BasePath) { return DeleteDirectories(BasePath); }
+
+ void Open(bool IsCreate)
+ {
+ Log().info("initializing oplog storage at '{}'", m_OplogStoragePath);
+
+ if (IsCreate)
+ {
+ DeleteDirectories(m_OplogStoragePath);
+ CreateDirectories(m_OplogStoragePath);
+ }
+
+ m_Oplog.Open(m_OplogStoragePath / "ops.zlog", IsCreate);
+ m_Oplog.Initialize();
+
+ m_OpBlobs.Open(m_OplogStoragePath / "ops.zops", IsCreate);
+
+ ZEN_ASSERT(IsPow2(m_OpsAlign));
+ ZEN_ASSERT(!(m_NextOpsOffset & (m_OpsAlign - 1)));
+
+#if USE_ROCKSDB
+ {
+ std::string RocksdbPath = WideToUtf8((m_OplogStoragePath / "ops.rdb").native().c_str());
+
+ Log().debug("opening rocksdb db at '{}'", RocksdbPath);
+
+ rocksdb::DB* Db;
+ rocksdb::DBOptions Options;
+ Options.create_if_missing = true;
+
+ std::vector<std::string> ExistingColumnFamilies;
+ rocksdb::Status Status = rocksdb::DB::ListColumnFamilies(Options, RocksdbPath, &ExistingColumnFamilies);
+
+ std::vector<rocksdb::ColumnFamilyDescriptor> ColumnDescriptors;
+
+ if (Status.IsPathNotFound())
+ {
+ ColumnDescriptors.emplace_back(rocksdb::ColumnFamilyDescriptor{rocksdb::kDefaultColumnFamilyName, {}});
+ }
+ else if (Status.ok())
+ {
+ for (const std::string& Column : ExistingColumnFamilies)
+ {
+ rocksdb::ColumnFamilyDescriptor ColumnFamily;
+ ColumnFamily.name = Column;
+ ColumnDescriptors.push_back(ColumnFamily);
+ }
+ }
+ else
+ {
+ throw std::exception("column family iteration failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str());
+ }
+
+ Status = rocksdb::DB::Open(Options, RocksdbPath, ColumnDescriptors, &m_RocksDbColumnHandles, &Db);
+
+ if (!Status.ok())
+ {
+ throw std::exception("database open failed for '{}': '{}'"_format(RocksdbPath, Status.getState()).c_str());
+ }
+
+ m_RocksDb.reset(Db);
+ }
+#endif
+ }
+
+ void ReplayLog(std::function<void(CbObject, const OplogEntry&)>&& Handler)
+ {
+ // This could use memory mapping or do something clever but for now it just reads the file sequentially
+
+ spdlog::info("replaying log for '{}'", m_OplogStoragePath);
+
+ Stopwatch Timer;
+
+ m_Oplog.Replay([&](const zen::OplogEntry& LogEntry) {
+ IoBuffer OpBuffer(LogEntry.OpCoreSize);
+
+ const uint64_t OpFileOffset = LogEntry.OpCoreOffset * m_OpsAlign;
+
+ m_OpBlobs.Read((void*)OpBuffer.Data(), LogEntry.OpCoreSize, OpFileOffset);
+
+ // Verify checksum, ignore op data if incorrect
+ const auto OpCoreHash = uint32_t(XXH3_64bits(OpBuffer.Data(), OpBuffer.Size()) & 0xffffFFFF);
+
+ if (OpCoreHash != LogEntry.OpCoreHash)
+ {
+ Log().warn("skipping oplog entry with bad checksum!");
+ return;
+ }
+
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
+
+ m_NextOpsOffset =
+ Max(m_NextOpsOffset.load(std::memory_order::memory_order_relaxed), RoundUp(OpFileOffset + LogEntry.OpCoreSize, m_OpsAlign));
+ m_MaxLsn = Max(m_MaxLsn.load(std::memory_order::memory_order_relaxed), LogEntry.OpLsn);
+
+ Handler(Op, LogEntry);
+ });
+
+ spdlog::info("Oplog replay completed in {} - Max LSN# {}, Next offset: {}",
+ NiceTimeSpanMs(Timer.getElapsedTimeMs()),
+ m_MaxLsn,
+ m_NextOpsOffset);
+ }
+
+ void ReplayLog(const std::vector<OplogEntryAddress>& Entries, std::function<void(CbObject)>&& Handler)
+ {
+ for (const OplogEntryAddress& Entry : Entries)
+ {
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ CbObject Op(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
+
+ Handler(Op);
+ }
+ }
+
+ CbObject GetOp(const OplogEntryAddress& Entry)
+ {
+ IoBuffer OpBuffer(Entry.Size);
+
+ const uint64_t OpFileOffset = Entry.Offset * m_OpsAlign;
+ m_OpBlobs.Read((void*)OpBuffer.Data(), Entry.Size, OpFileOffset);
+
+ return CbObject(SharedBuffer::MakeView(OpBuffer.Data(), OpBuffer.Size()));
+ }
+
+ OplogEntry AppendOp(CbObject Op)
+ {
+ SharedBuffer Buffer = Op.GetBuffer();
+ const uint64_t WriteSize = Buffer.GetSize();
+ const auto OpCoreHash = uint32_t(XXH3_64bits(Buffer.GetData(), WriteSize) & 0xffffFFFF);
+
+ XXH3_128Stream KeyHasher;
+ Op["key"].WriteToStream([&](const void* Data, size_t Size) { KeyHasher.Append(Data, Size); });
+ XXH3_128 KeyHash = KeyHasher.GetHash();
+
+ RwLock::ExclusiveLockScope _(m_RwLock);
+ const uint64_t WriteOffset = m_NextOpsOffset;
+ const uint32_t OpLsn = ++m_MaxLsn;
+
+ m_NextOpsOffset = RoundUp(WriteOffset + WriteSize, m_OpsAlign);
+
+ ZEN_ASSERT(IsMultipleOf(WriteOffset, m_OpsAlign));
+
+ OplogEntry Entry = {.OpLsn = OpLsn,
+ .OpCoreOffset = gsl::narrow_cast<uint32_t>(WriteOffset / m_OpsAlign),
+ .OpCoreSize = uint32_t(Buffer.GetSize()),
+ .OpCoreHash = OpCoreHash,
+ .OpKeyHash = KeyHash};
+
+ m_Oplog.Append(Entry);
+
+ m_OpBlobs.Write(Buffer.GetData(), WriteSize, WriteOffset);
+
+ return Entry;
+ }
+
+ void Flush()
+ {
+ m_Oplog.Flush();
+ m_OpBlobs.Flush();
+ }
+
+ spdlog::logger& Log() { return m_OwnerOplog->Log(); }
+
+private:
+ ProjectStore::Oplog* m_OwnerOplog;
+ std::filesystem::path m_OplogStoragePath;
+ RwLock m_RwLock;
+ TCasLogFile<OplogEntry> m_Oplog;
+ BasicFile m_OpBlobs;
+ std::atomic<uint64_t> m_NextOpsOffset{0};
+ uint64_t m_OpsAlign = 32;
+ std::atomic<uint32_t> m_MaxLsn{0};
+
+#if USE_ROCKSDB
+ std::unique_ptr<rocksdb::DB> m_RocksDb;
+ std::vector<rocksdb::ColumnFamilyHandle*> m_RocksDbColumnHandles;
+#endif
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Oplog::Oplog(std::string_view Id, Project* Outer, CasStore& Store, std::filesystem::path BasePath)
+: m_OuterProject(Outer)
+, m_CasStore(Store)
+, m_OplogId(Id)
+, m_BasePath(BasePath)
+{
+ m_Storage = new OplogStorage(this, m_BasePath);
+ const bool StoreExists = m_Storage->Exists();
+ m_Storage->Open(/* IsCreate */ !StoreExists);
+
+ m_TempPath = m_BasePath / "temp";
+
+ zen::CleanDirectory(m_TempPath);
+}
+
+ProjectStore::Oplog::~Oplog() = default;
+
+bool
+ProjectStore::Oplog::ExistsAt(std::filesystem::path BasePath)
+{
+ return OplogStorage::Exists(BasePath);
+}
+
+void
+ProjectStore::Oplog::ReplayLog()
+{
+ m_Storage->ReplayLog([&](CbObject Op, const OplogEntry& OpEntry) { RegisterOplogEntry(Op, OpEntry, kUpdateReplay); });
+}
+
+IoBuffer
+ProjectStore::Oplog::FindChunk(Oid ChunkId)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ if (auto ChunkIt = m_ChunkMap.find(ChunkId); ChunkIt != m_ChunkMap.end())
+ {
+ _.ReleaseNow();
+
+ return m_CasStore.FindChunk(ChunkIt->second);
+ }
+
+ if (auto FileIt = m_FileMap.find(ChunkId); FileIt != m_FileMap.end())
+ {
+ _.ReleaseNow();
+
+ std::filesystem::path FilePath = m_OuterProject->RootDir / FileIt->second.ServerPath;
+
+ return IoBufferBuilder::MakeFromFile(FilePath.native().c_str());
+ }
+
+ if (auto MetaIt = m_MetaMap.find(ChunkId); MetaIt != m_MetaMap.end())
+ {
+ _.ReleaseNow();
+
+ return m_CasStore.FindChunk(MetaIt->second);
+ }
+
+ return {};
+}
+
+void
+ProjectStore::Oplog::IterateFileMap(
+ std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ for (const auto& Kv : m_FileMap)
+ {
+ Fn(Kv.first, Kv.second.ServerPath, Kv.second.ClientPath);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateChunkMap(std::function<void(const Oid&, const IoHash&)>&& Fn)
+{
+ RwLock::SharedLockScope _(m_OplogLock);
+
+ for (const auto& Kv : m_ChunkMap)
+ {
+ Fn(Kv.first, Kv.second);
+ }
+}
+
+void
+ProjectStore::Oplog::IterateOplog(std::function<void(CbObject)>&& Handler)
+{
+ std::vector<OplogEntryAddress> Entries;
+ Entries.reserve(m_LatestOpMap.size());
+
+ for (const auto& Kv : m_LatestOpMap)
+ {
+ const auto AddressEntry = m_OpAddressMap.find(Kv.second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ Entries.push_back(AddressEntry->second);
+ }
+
+ std::sort(Entries.begin(), Entries.end(), [](const OplogEntryAddress& Lhs, const OplogEntryAddress& Rhs) {
+ return Lhs.Offset < Rhs.Offset;
+ });
+
+ m_Storage->ReplayLog(Entries, [&](CbObject Op) { Handler(Op); });
+}
+
+std::optional<CbObject>
+ProjectStore::Oplog::GetOplog(const Oid& Key)
+{
+ if (const auto LatestOp = m_LatestOpMap.find(Key); LatestOp != m_LatestOpMap.end())
+ {
+ const auto AddressEntry = m_OpAddressMap.find(LatestOp->second);
+ ZEN_ASSERT(AddressEntry != m_OpAddressMap.end());
+
+ return m_Storage->GetOp(AddressEntry->second);
+ }
+
+ return {};
+}
+
+bool
+ProjectStore::Oplog::AddFileMapping(Oid FileId, IoHash Hash, std::string_view ServerPath, std::string_view ClientPath)
+{
+ // NOTE: Caller must hold an exclusive lock on m_OplogLock
+
+ if (ServerPath.empty() || ClientPath.empty())
+ {
+ return false;
+ }
+
+ if (ServerPath[0] == '/' || ClientPath[0] != '/')
+ {
+ return false;
+ }
+
+ FileMapEntry Entry;
+ Entry.ServerPath = ServerPath;
+ Entry.ClientPath = ClientPath;
+
+ m_FileMap[FileId] = std::move(Entry);
+
+ if (Hash != IoHash::Zero)
+ {
+ m_ChunkMap[FileId] = Hash;
+ }
+
+ return true;
+}
+
+void
+ProjectStore::Oplog::AddChunkMapping(Oid ChunkId, IoHash Hash)
+{
+ // NOTE: Caller must hold an exclusive lock on m_OplogLock
+
+ m_ChunkMap[ChunkId] = Hash;
+}
+
+void
+ProjectStore::Oplog::AddMetaMapping(Oid ChunkId, IoHash Hash)
+{
+ // NOTE: Caller must hold an exclusive lock on m_OplogLock
+
+ m_MetaMap[ChunkId] = Hash;
+}
+
+uint32_t
+ProjectStore::Oplog::RegisterOplogEntry(CbObject Core, const OplogEntry& OpEntry, UpdateType TypeOfUpdate)
+{
+ ZEN_UNUSED(TypeOfUpdate);
+
+ // For now we're assuming the update is all in-memory so we can hold an exclusive lock without causing
+ // too many problems. Longer term we'll probably want to ensure we can do concurrent updates however
+
+ RwLock::ExclusiveLockScope _(m_OplogLock);
+
+ using namespace std::literals;
+
+ // Update chunk id maps
+
+ if (Core["package"sv])
+ {
+ CbObjectView PkgObj = Core["package"sv].AsObjectView();
+ Oid PackageId = PkgObj["id"sv].AsObjectId();
+ IoHash PackageHash = PkgObj["data"sv].AsBinaryAttachment();
+
+ AddChunkMapping(PackageId, PackageHash);
+
+ Log().debug("package data {} -> {}", PackageId, PackageHash);
+ }
+
+ for (CbFieldView& Entry : Core["bulkdata"sv])
+ {
+ CbObjectView BulkObj = Entry.AsObjectView();
+
+ Oid BulkDataId = BulkObj["id"sv].AsObjectId();
+ IoHash BulkDataHash = BulkObj["data"sv].AsBinaryAttachment();
+
+ AddChunkMapping(BulkDataId, BulkDataHash);
+
+ Log().debug("bulkdata {} -> {}", BulkDataId, BulkDataHash);
+ }
+
+ if (Core["files"sv])
+ {
+ Stopwatch Timer;
+ int32_t FileCount = 0;
+
+ for (CbFieldView& Entry : Core["files"sv])
+ {
+ CbObjectView FileObj = Entry.AsObjectView();
+ const Oid FileId = FileObj["id"sv].AsObjectId();
+ IoHash FileDataHash = FileObj["data"sv].AsBinaryAttachment();
+ std::string_view ServerPath = FileObj["serverpath"sv].AsString();
+ std::string_view ClientPath = FileObj["clientpath"sv].AsString();
+
+ if (AddFileMapping(FileId, FileDataHash, ServerPath, ClientPath))
+ {
+ ++FileCount;
+ }
+ else
+ {
+ Log().warn("invalid file");
+ }
+ }
+
+ Log().debug("added {} file(s) in {}", FileCount, NiceTimeSpanMs(Timer.getElapsedTimeMs()));
+ }
+
+ for (CbFieldView& Entry : Core["meta"sv])
+ {
+ CbObjectView MetaObj = Entry.AsObjectView();
+ const Oid MetaId = MetaObj["id"sv].AsObjectId();
+ auto NameString = MetaObj["name"sv].AsString();
+ IoHash MetaDataHash = MetaObj["data"sv].AsBinaryAttachment();
+
+ AddMetaMapping(MetaId, MetaDataHash);
+
+ Log().debug("meta data ({}) {} -> {}", NameString, MetaId, MetaDataHash);
+ }
+
+ m_OpAddressMap.emplace(OpEntry.OpLsn, OplogEntryAddress{.Offset = OpEntry.OpCoreOffset, .Size = OpEntry.OpCoreSize});
+ m_LatestOpMap[OpEntry.OpKeyAsOId()] = OpEntry.OpLsn;
+
+ return OpEntry.OpLsn;
+}
+
+uint32_t
+ProjectStore::Oplog::AppendNewOplogEntry(CbPackage OpPackage)
+{
+ using namespace std::literals;
+
+ const CbObject& Core = OpPackage.GetObject();
+ const OplogEntry OpEntry = m_Storage->AppendOp(Core);
+
+ // Persist attachments
+
+ auto Attachments = OpPackage.GetAttachments();
+
+ for (const auto& Attach : Attachments)
+ {
+ IoBuffer AttachmentData = Attach.AsBinaryView().AsIoBuffer();
+ m_CasStore.InsertChunk(AttachmentData, Attach.GetHash());
+ }
+
+ return RegisterOplogEntry(Core, OpEntry, kUpdateNewEntry);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::Project::Project(ProjectStore* PrjStore, CasStore& Store, std::filesystem::path BasePath)
+: m_ProjectStore(PrjStore)
+, m_CasStore(Store)
+, m_OplogStoragePath(BasePath)
+{
+}
+
+ProjectStore::Project::~Project()
+{
+}
+
+bool
+ProjectStore::Project::Exists(std::filesystem::path BasePath)
+{
+ return std::filesystem::exists(BasePath / "Project.zcb");
+}
+
+void
+ProjectStore::Project::Read()
+{
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb";
+
+ spdlog::info("reading config for project '{}' from {}", Identifier, ProjectStateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectStateFilePath, false);
+
+ IoBuffer Obj = Blob.ReadAll();
+ CbValidateError ValidationError = ValidateCompactBinary(MemoryView(Obj.Data(), Obj.Size()), CbValidateMode::All);
+
+ if (ValidationError == CbValidateError::None)
+ {
+ CbObject Cfg = LoadCompactBinaryObject(Obj);
+
+ Identifier = Cfg["id"].AsString();
+ RootDir = Cfg["root"].AsString();
+ ProjectRootDir = Cfg["project"].AsString();
+ EngineRootDir = Cfg["engine"].AsString();
+ }
+ else
+ {
+ spdlog::error("validation error {} hit for '{}'", int(ValidationError), ProjectStateFilePath);
+ }
+}
+
+void
+ProjectStore::Project::Write()
+{
+ MemoryOutStream Mem;
+ BinaryWriter Writer(Mem);
+
+ CbObjectWriter Cfg;
+ Cfg << "id" << Identifier;
+ Cfg << "root" << WideToUtf8(RootDir.c_str());
+ Cfg << "project" << ProjectRootDir;
+ Cfg << "engine" << EngineRootDir;
+
+ Cfg.Save(Writer);
+
+ CreateDirectories(m_OplogStoragePath);
+
+ std::filesystem::path ProjectStateFilePath = m_OplogStoragePath / "Project.zcb";
+
+ spdlog::info("persisting config for project '{}' to {}", Identifier, ProjectStateFilePath);
+
+ BasicFile Blob;
+ Blob.Open(ProjectStateFilePath, true);
+ Blob.Write(Mem.Data(), Mem.Size(), 0);
+ Blob.Flush();
+}
+
+spdlog::logger&
+ProjectStore::Project::Log()
+{
+ return m_ProjectStore->Log();
+}
+
+std::filesystem::path
+ProjectStore::Project::BasePathForOplog(std::string_view OplogId)
+{
+ return m_OplogStoragePath / OplogId;
+}
+
+ProjectStore::Oplog*
+ProjectStore::Project::NewOplog(std::string_view OplogId)
+{
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ try
+ {
+ Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second;
+
+ return &Log;
+ }
+ catch (std::exception&)
+ {
+ // In case of failure we need to ensure there's no half constructed entry around
+ //
+ // (This is probably already ensured by the try_emplace implementation?)
+
+ m_Oplogs.erase(std::string{OplogId});
+
+ return nullptr;
+ }
+}
+
+ProjectStore::Oplog*
+ProjectStore::Project::OpenOplog(std::string_view OplogId)
+{
+ {
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ return &OplogIt->second;
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ if (Oplog::ExistsAt(OplogBasePath))
+ {
+ // Do open of existing oplog
+
+ try
+ {
+ Oplog& Log = m_Oplogs.try_emplace(std::string{OplogId}, OplogId, this, m_CasStore, OplogBasePath).first->second;
+
+ Log.ReplayLog();
+
+ return &Log;
+ }
+ catch (std::exception& ex)
+ {
+ spdlog::error("failed to open oplog '{}' @ '{}': {}", OplogId, OplogBasePath, ex.what());
+
+ m_Oplogs.erase(std::string{OplogId});
+ }
+ }
+
+ return nullptr;
+}
+
+void
+ProjectStore::Project::DeleteOplog(std::string_view OplogId)
+{
+ bool Exists = false;
+
+ {
+ RwLock::ExclusiveLockScope _(m_ProjectLock);
+
+ auto OplogIt = m_Oplogs.find(std::string(OplogId));
+
+ if (OplogIt != m_Oplogs.end())
+ {
+ Exists = true;
+
+ m_Oplogs.erase(OplogIt);
+ }
+ }
+
+ // Actually erase
+
+ std::filesystem::path OplogBasePath = BasePathForOplog(OplogId);
+
+ OplogStorage::Delete(OplogBasePath);
+}
+
+void
+ProjectStore::Project::IterateOplogs(std::function<void(const Oplog&)>&& Fn) const
+{
+ // TODO: should iterate over oplogs which are present on disk but not yet loaded
+
+ RwLock::SharedLockScope _(m_ProjectLock);
+
+ for (auto& Kv : m_Oplogs)
+ {
+ Fn(Kv.second);
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath)
+: m_Log("project", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_ProjectBasePath(BasePath)
+, m_CasStore(Store)
+{
+ m_Log.info("initializing project store at '{}'", BasePath);
+ m_Log.set_level(spdlog::level::debug);
+}
+
+ProjectStore::~ProjectStore()
+{
+ m_Log.info("closing project store ('{}')", m_ProjectBasePath);
+}
+
+std::filesystem::path
+ProjectStore::BasePathForProject(std::string_view ProjectId)
+{
+ return m_ProjectBasePath / ProjectId;
+}
+
+ProjectStore::Project*
+ProjectStore::OpenProject(std::string_view ProjectId)
+{
+ {
+ RwLock::SharedLockScope _(m_ProjectsLock);
+
+ auto ProjIt = m_Projects.find(std::string{ProjectId});
+
+ if (ProjIt != m_Projects.end())
+ {
+ return &(ProjIt->second);
+ }
+ }
+
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+
+ std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId);
+
+ if (Project::Exists(ProjectBasePath))
+ {
+ try
+ {
+ Log().info("opening project {} @ {}", ProjectId, ProjectBasePath);
+
+ ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, ProjectBasePath).first->second;
+ Prj.Identifier = ProjectId;
+ Prj.Read();
+ return &Prj;
+ }
+ catch (std::exception& e)
+ {
+ Log().warn("failed to open {} @ {} ({})", ProjectId, ProjectBasePath, e.what());
+ m_Projects.erase(std::string{ProjectId});
+ }
+ }
+
+ return nullptr;
+}
+
+ProjectStore::Project*
+ProjectStore::NewProject(std::filesystem::path BasePath,
+ std::string_view ProjectId,
+ std::string_view RootDir,
+ std::string_view EngineRootDir,
+ std::string_view ProjectRootDir)
+{
+ RwLock::ExclusiveLockScope _(m_ProjectsLock);
+
+ ProjectStore::Project& Prj = m_Projects.try_emplace(std::string{ProjectId}, this, m_CasStore, BasePath).first->second;
+ Prj.Identifier = ProjectId;
+ Prj.RootDir = RootDir;
+ Prj.EngineRootDir = EngineRootDir;
+ Prj.ProjectRootDir = ProjectRootDir;
+ Prj.Write();
+
+ return &Prj;
+}
+
+void
+ProjectStore::DeleteProject(std::string_view ProjectId)
+{
+ std::filesystem::path ProjectBasePath = BasePathForProject(ProjectId);
+
+ Log().info("deleting project {} @ {}", ProjectId, ProjectBasePath);
+
+ m_Projects.erase(std::string{ProjectId});
+
+ DeleteDirectories(ProjectBasePath);
+}
+
+bool
+ProjectStore::Exists(std::string_view ProjectId)
+{
+ return Project::Exists(BasePathForProject(ProjectId));
+}
+
+ProjectStore::Oplog*
+ProjectStore::OpenProjectOplog(std::string_view ProjectId, std::string_view OplogId)
+{
+ if (Project* ProjectIt = OpenProject(ProjectId))
+ {
+ return ProjectIt->OpenOplog(OplogId);
+ }
+
+ return nullptr;
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects)
+: m_CasStore(Store)
+, m_Log("project", begin(spdlog::default_logger()->sinks()), end(spdlog::default_logger()->sinks()))
+, m_ProjectStore(Projects)
+{
+ using namespace std::literals;
+
+ m_Router.AddPattern("project", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("log", "([[:alnum:]_.]+)");
+ m_Router.AddPattern("op", "([[:digit:]]+?)");
+ m_Router.AddPattern("chunk", "([[:xdigit:]]{24})");
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/batch",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ // Parse Request
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ MemoryInStream MemIn(Payload.Data(), Payload.Size());
+ BinaryReader Reader(MemIn);
+
+ struct RequestHeader
+ {
+ enum
+ {
+ kMagic = 0xAAAA'77AC
+ };
+ uint32_t Magic;
+ uint32_t ChunkCount;
+ uint32_t Reserved1;
+ uint32_t Reserved2;
+ };
+
+ struct RequestChunkEntry
+ {
+ Oid ChunkId;
+ uint32_t CorrelationId;
+ uint64_t Offset;
+ uint64_t RequestBytes;
+ };
+
+ if (Payload.Size() <= sizeof(RequestHeader))
+ {
+ HttpReq.WriteResponse(HttpResponse::BadRequest);
+ }
+
+ RequestHeader RequestHdr;
+ Reader.Read(&RequestHdr, sizeof RequestHdr);
+
+ if (RequestHdr.Magic != RequestHeader::kMagic)
+ {
+ HttpReq.WriteResponse(HttpResponse::BadRequest);
+ }
+
+ std::vector<RequestChunkEntry> RequestedChunks;
+ RequestedChunks.resize(RequestHdr.ChunkCount);
+ Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
+
+ // Make Response
+
+ struct ResponseHeader
+ {
+ uint32_t Magic = 0xbada'b00f;
+ uint32_t ChunkCount;
+ uint32_t Reserved1 = 0;
+ uint32_t Reserved2 = 0;
+ };
+
+ struct ResponseChunkEntry
+ {
+ uint32_t CorrelationId;
+ uint32_t Flags = 0;
+ uint64_t ChunkSize;
+ };
+
+ std::vector<IoBuffer> OutBlobs;
+ OutBlobs.emplace_back(sizeof(ResponseHeader) + RequestHdr.ChunkCount * sizeof(ResponseChunkEntry));
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ IoBuffer FoundChunk = FoundLog->FindChunk(RequestedChunk.ChunkId);
+ if (FoundChunk)
+ {
+ if (RequestedChunk.Offset > 0 || RequestedChunk.RequestBytes < uint64_t(-1))
+ {
+ uint64_t Offset = RequestedChunk.Offset;
+ if (Offset > FoundChunk.Size())
+ {
+ Offset = FoundChunk.Size();
+ }
+ uint64_t Size = RequestedChunk.RequestBytes;
+ if ((Offset + Size) > FoundChunk.Size())
+ {
+ Size = FoundChunk.Size() - Offset;
+ }
+ FoundChunk = IoBuffer(FoundChunk, Offset, Size);
+ }
+ }
+ OutBlobs.emplace_back(std::move(FoundChunk));
+ }
+ uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
+ ResponseHeader ResponseHdr;
+ ResponseHdr.ChunkCount = RequestHdr.ChunkCount;
+ memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
+ ResponsePtr += sizeof(ResponseHdr);
+ for (uint32_t ChunkIndex = 0; ChunkIndex < RequestHdr.ChunkCount; ++ChunkIndex)
+ {
+ const RequestChunkEntry& RequestedChunk = RequestedChunks[ChunkIndex];
+ const IoBuffer& FoundChunk(OutBlobs[ChunkIndex + 1]);
+ ResponseChunkEntry ResponseChunk;
+ ResponseChunk.CorrelationId = ChunkIndex;
+ if (FoundChunk)
+ {
+ ResponseChunk.ChunkSize = FoundChunk.Size();
+ }
+ else
+ {
+ ResponseChunk.ChunkSize = uint64_t(-1);
+ }
+ memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
+ ResponsePtr += sizeof(ResponseChunk);
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, OutBlobs);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/files",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // File manifest fetch, returns the client file list
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ const bool FilterClient = Params.GetValue("filter") == "client";
+
+ CbObjectWriter Response;
+ Response.BeginArray("files");
+
+ Log.IterateFileMap([&](const Oid& Id, const std::string_view& ServerPath, const std::string_view& ClientPath) {
+ Response.BeginObject();
+ Response << "id" << Id;
+ Response << "clientpath" << ClientPath;
+ if (!FilterClient)
+ {
+ Response << "serverpath" << ServerPath;
+ }
+ Response.EndObject();
+ });
+
+ Response.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}/info",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Value = Log.FindChunk(Obj);
+
+ if (Value)
+ {
+ CbObjectWriter Response;
+ Response << "size" << Value.Size();
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{chunk}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+ const auto& ChunkId = Req.GetCapture(3);
+
+ bool IsOffset = false;
+ uint64_t Offset = 0;
+ uint64_t Size = ~(0ull);
+
+ auto QueryParms = Req.ServerRequest().GetQueryParams();
+
+ if (auto OffsetParm = QueryParms.GetValue("offset"); OffsetParm.empty() == false)
+ {
+ if (auto OffsetVal = ParseInt<uint64_t>(OffsetParm))
+ {
+ Offset = OffsetVal.value();
+ IsOffset = true;
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponse::BadRequest);
+ }
+ }
+
+ if (auto SizeParm = QueryParms.GetValue("size"); SizeParm.empty() == false)
+ {
+ if (auto SizeVal = ParseInt<uint64_t>(SizeParm))
+ {
+ Size = SizeVal.value();
+ IsOffset = true;
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponse::BadRequest);
+ }
+ }
+
+ m_Log.debug("chunk - {} / {} / {}", ProjectId, OplogId, ChunkId);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ Oid Obj = Oid::FromHexString(ChunkId);
+
+ IoBuffer Value = Log.FindChunk(Obj);
+
+ switch (HttpVerb Verb = HttpReq.RequestVerb())
+ {
+ case HttpVerb::kHead:
+ case HttpVerb::kGet:
+ if (!Value)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ if (Verb == HttpVerb::kHead)
+ {
+ HttpReq.SetSuppressResponseBody();
+ }
+
+ if (IsOffset)
+ {
+ if (Offset > Value.Size())
+ {
+ Offset = Value.Size();
+ }
+
+ if ((Offset + Size) > Value.Size())
+ {
+ Size = Value.Size() - Offset;
+ }
+
+ // Send only a subset of data
+ IoBuffer InnerValue(Value, Offset, Size);
+
+ return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, InnerValue);
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::OK, HttpContentType::kBinary, Value);
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kHead);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/prep",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ // This operation takes a list of referenced hashes and decides which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ for (auto Entry : RequestObject["have"sv])
+ {
+ const IoHash FileHash = Entry.AsHash();
+
+ if (!m_CasStore.FindChunk(FileHash))
+ {
+ spdlog::debug("NEED: {}", FileHash);
+
+ NeedList.push_back(FileHash);
+ }
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/new",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ bool IsUsingSalt = false;
+ IoHash SaltHash = IoHash::Zero;
+
+ if (std::string_view SaltParam = Params.GetValue("salt"); SaltParam.empty() == false)
+ {
+ const uint32_t Salt = std::stoi(std::string(SaltParam));
+ SaltHash = IoHash::HashMemory(&Salt, sizeof Salt);
+ IsUsingSalt = true;
+ }
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ ProjectStore::Oplog& Log = *FoundLog;
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+
+ // This will attempt to open files which may not exist for the case where
+ // the prep step rejected the chunk. This should be fixed since there's
+ // a performance cost associated with any file system activity
+
+ bool IsValid = true;
+ std::vector<IoHash> MissingChunks;
+
+ CbPackage::AttachmentResolver Resolver = [&](const IoHash& Hash) -> SharedBuffer {
+ IoHash AttachmentId;
+
+ if (IsUsingSalt)
+ {
+ IoHash AttachmentSpec[]{SaltHash, Hash};
+ AttachmentId = IoHash::HashMemory(MakeMemoryView(AttachmentSpec));
+ }
+ else
+ {
+ AttachmentId = Hash;
+ }
+
+ std::filesystem::path AttachmentPath = Log.TempPath() / AttachmentId.ToHexString();
+
+ if (IoBuffer Data = m_CasStore.FindChunk(Hash))
+ {
+ return SharedBuffer(std::move(Data));
+ }
+ else if (Data = IoBufferBuilder::MakeFromTemporaryFile(AttachmentPath.native().c_str()))
+ {
+ return SharedBuffer(std::move(Data));
+ }
+ else
+ {
+ IsValid = false;
+ MissingChunks.push_back(Hash);
+
+ return {};
+ }
+ };
+
+ CbPackage Package;
+ Package.Load(Payload, &UniqueBuffer::Alloc, &Resolver);
+
+ if (!IsValid)
+ {
+ // TODO: emit diagnostics identifying missing chunks
+
+ return HttpReq.WriteResponse(HttpResponse::NotFound, HttpContentType::kText, "Missing chunk reference");
+ }
+
+ CbObject Core = Package.GetObject();
+
+ if (!Core["key"sv])
+ {
+ return HttpReq.WriteResponse(HttpResponse::BadRequest, HttpContentType::kText, "No oplog entry key specified");
+ }
+
+ // Write core to oplog
+
+ const uint32_t OpLsn = Log.AppendNewOplogEntry(Package);
+
+ if (OpLsn == ProjectStore::Oplog::kInvalidOp)
+ {
+ return HttpReq.WriteResponse(HttpResponse::BadRequest);
+ }
+
+ m_Log.info("new op #{:4} - {}/{} ({:>6}) {}", OpLsn, ProjectId, OplogId, NiceBytes(Payload.Size()), Core["key"sv].AsString());
+
+ HttpReq.WriteResponse(HttpResponse::Created);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/{op}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // TODO: look up op and respond with the payload!
+
+ HttpReq.WriteResponse(HttpResponse::Accepted, HttpContentType::kText, u8"yeee"sv);
+ },
+ HttpVerb::kGet);
+
+ using namespace fmt::literals;
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}",
+ [this](HttpRouterRequest& Req) {
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!ProjectIt)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponse::NotFound,
+ HttpContentType::kText,
+ "project {} not found"_format(ProjectId));
+ }
+
+ ProjectStore::Project& Prj = *ProjectIt;
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId);
+
+ if (!OplogIt)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponse::NotFound,
+ HttpContentType::kText,
+ "oplog {} not found in project {}"_format(OplogId, ProjectId));
+ }
+
+ ProjectStore::Oplog& Log = *OplogIt;
+
+ CbObjectWriter Cb;
+ Cb << "id"sv << Log.OplogId() << "project"sv << Prj.Identifier << "tempdir"sv << Log.TempDir();
+
+ Req.ServerRequest().WriteResponse(HttpResponse::OK, Cb.Save());
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ ProjectStore::Oplog* OplogIt = Prj.OpenOplog(OplogId);
+
+ if (!OplogIt)
+ {
+ if (!Prj.NewOplog(OplogId))
+ {
+ // TODO: indicate why the operation failed!
+ return Req.ServerRequest().WriteResponse(HttpResponse::InternalServerError);
+ }
+
+ m_Log.info("established oplog {} / {}", ProjectId, OplogId);
+
+ return Req.ServerRequest().WriteResponse(HttpResponse::Created);
+ }
+
+ // I guess this should ultimately be used to execute RPCs but for now, it
+ // does absolutely nothing
+
+ return Req.ServerRequest().WriteResponse(HttpResponse::BadRequest);
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ spdlog::info("deleting oplog {}/{}", ProjectId, OplogId);
+
+ ProjectIt->DeleteOplog(OplogId);
+
+ return Req.ServerRequest().WriteResponse(HttpResponse::OK);
+ }
+ break;
+ }
+ },
+ HttpVerb::kPost | HttpVerb::kGet | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/entries",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ CbObjectWriter Response;
+
+ if (FoundLog->OplogCount() > 0)
+ {
+ HttpServerRequest::QueryParams Params = HttpReq.GetQueryParams();
+
+ if (auto OpKey = Params.GetValue("opkey"); !OpKey.empty())
+ {
+ XXH3_128Stream KeyHasher;
+ KeyHasher.Append(OpKey.data(), OpKey.size());
+ XXH3_128 OpKeyHash = KeyHasher.GetHash();
+
+ Oid OpId;
+ memcpy(OpId.OidBits, &OpKeyHash, sizeof(OpId.OidBits));
+
+ std::optional<CbObject> Op = FoundLog->GetOplog(OpId);
+
+ if (Op.has_value())
+ {
+ Response << Op.value();
+ }
+ }
+ else
+ {
+ Response.BeginArray("entries"sv);
+
+ FoundLog->IterateOplog([&Response](CbObject Op) { Response << Op; });
+
+ Response.EndArray();
+ }
+ }
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "{project}",
+ [this](HttpRouterRequest& Req) {
+ const std::string ProjectId = Req.GetCapture(1);
+
+ switch (Req.ServerRequest().RequestVerb())
+ {
+ case HttpVerb::kPost:
+ {
+ IoBuffer Payload = Req.ServerRequest().ReadPayload();
+ CbObject Params = LoadCompactBinaryObject(Payload);
+ std::string_view Id = Params["id"sv].AsString();
+ std::string_view Root = Params["root"sv].AsString();
+ std::string_view EngineRoot = Params["engine"sv].AsString();
+ std::string_view ProjectRoot = Params["project"sv].AsString();
+
+ const std::filesystem::path BasePath = m_ProjectStore->BasePath() / ProjectId;
+ m_ProjectStore->NewProject(BasePath, ProjectId, Root, EngineRoot, ProjectRoot);
+
+ m_Log.info("established project - {} (id: '{}', roots: '{}', '{}', '{}')",
+ ProjectId,
+ Id,
+ Root,
+ EngineRoot,
+ ProjectRoot);
+
+ Req.ServerRequest().WriteResponse(HttpResponse::Created);
+ }
+ break;
+
+ case HttpVerb::kGet:
+ {
+ ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!ProjectIt)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponse::NotFound,
+ HttpContentType::kText,
+ "project {} not found"_format(ProjectId));
+ }
+
+ const ProjectStore::Project& Prj = *ProjectIt;
+
+ CbObjectWriter Response;
+ Response << "id" << Prj.Identifier << "root" << WideToUtf8(Prj.RootDir.c_str());
+
+ Response.BeginArray("oplogs"sv);
+ Prj.IterateOplogs([&](const ProjectStore::Oplog& I) { Response << "id"sv << I.OplogId(); });
+ Response.EndArray(); // oplogs
+
+ Req.ServerRequest().WriteResponse(HttpResponse::OK, Response.Save());
+ }
+ break;
+
+ case HttpVerb::kDelete:
+ {
+ ProjectStore::Project* ProjectIt = m_ProjectStore->OpenProject(ProjectId);
+
+ if (!ProjectIt)
+ {
+ return Req.ServerRequest().WriteResponse(HttpResponse::NotFound,
+ HttpContentType::kText,
+ "project {} not found"_format(ProjectId));
+ }
+
+ m_ProjectStore->DeleteProject(ProjectId);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "{project}/oplog/{log}/chunks",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto& ProjectId = Req.GetCapture(1);
+ const auto& OplogId = Req.GetCapture(2);
+
+ auto QueryParams = Req.ServerRequest().GetQueryParams();
+
+ ProjectStore::Oplog* FoundLog = m_ProjectStore->OpenProjectOplog(ProjectId, OplogId);
+
+ if (FoundLog == nullptr)
+ {
+ return HttpReq.WriteResponse(HttpResponse::NotFound);
+ }
+
+ CbObjectWriter Response;
+
+ Response.BeginArray("chunks"sv);
+
+ FoundLog->IterateChunkMap([&Response](const Oid& Id, const IoHash& Hash) {
+ Response.BeginObject();
+ Response << "id"sv << Id;
+ Response << "hash"sv << Hash;
+ Response.EndObject();
+ });
+
+ Response.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponse::OK, Response.Save());
+ },
+ HttpVerb::kGet);
+}
+
+HttpProjectService::~HttpProjectService()
+{
+}
+
+const char*
+HttpProjectService::BaseUri() const
+{
+ return "/prj/";
+}
+
+void
+HttpProjectService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ m_Log.warn("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+class SecurityAttributes
+{
+public:
+ inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; }
+
+protected:
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+};
+
+// Security attributes which allows any user access
+
+class AnyUserSecurityAttributes : public SecurityAttributes
+{
+public:
+ AnyUserSecurityAttributes()
+ {
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (success)
+ {
+ const BOOL bSetOk = SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE);
+ if (bSetOk)
+ {
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+ }
+ }
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+struct LocalProjectService::LocalProjectImpl
+{
+ LocalProjectImpl() : m_WorkerThreadPool(ServiceThreadCount) {}
+ ~LocalProjectImpl() { Stop(); }
+
+ void Start()
+ {
+ ZEN_ASSERT(!m_IsStarted);
+
+ for (int i = 0; i < 32; ++i)
+ {
+ PipeConnection* NewPipe = new PipeConnection(this);
+ m_ServicePipes.push_back(NewPipe);
+ m_IoContext.post([NewPipe] { NewPipe->Accept(); });
+ }
+
+ for (int i = 0; i < ServiceThreadCount; ++i)
+ {
+ asio::post(m_WorkerThreadPool, [this] {
+ try
+ {
+ m_IoContext.run();
+ }
+ catch (std::exception& ex)
+ {
+ spdlog::error("exception caught in pipe project service loop: {}", ex.what());
+ }
+
+ m_ShutdownLatch.count_down();
+ });
+ }
+
+ m_IsStarted = true;
+ }
+
+ void Stop()
+ {
+ if (!m_IsStarted)
+ {
+ return;
+ }
+
+ for (PipeConnection* Pipe : m_ServicePipes)
+ {
+ Pipe->Disconnect();
+ }
+
+ m_IoContext.stop();
+ m_ShutdownLatch.wait();
+
+ for (PipeConnection* Pipe : m_ServicePipes)
+ {
+ delete Pipe;
+ }
+
+ m_ServicePipes.clear();
+ }
+
+private:
+ asio::io_context& IoContext() { return m_IoContext; }
+ auto PipeSecurityAttributes() { return m_AnyUserSecurityAttributes.Attributes(); }
+ static const int ServiceThreadCount = 4;
+
+ std::latch m_ShutdownLatch{ServiceThreadCount};
+ asio::thread_pool m_WorkerThreadPool;
+ asio::io_context m_IoContext;
+
+ class PipeConnection
+ {
+ enum PipeState
+ {
+ kUninitialized,
+ kConnecting,
+ kReading,
+ kWriting,
+ kDisconnected,
+ kInvalid
+ };
+
+ LocalProjectImpl* m_Outer;
+ asio::windows::stream_handle m_PipeHandle;
+ std::atomic<PipeState> m_PipeState{kUninitialized};
+
+ public:
+ PipeConnection(LocalProjectImpl* Outer) : m_Outer(Outer), m_PipeHandle{m_Outer->IoContext()} {}
+ ~PipeConnection() {}
+
+ void Disconnect()
+ {
+ m_PipeState = kDisconnected;
+ DisconnectNamedPipe(m_PipeHandle.native_handle());
+ }
+
+ void Accept()
+ {
+ StringBuilder<64> PipeName;
+ PipeName << "\\\\.\\pipe\\zenprj"; // TODO: this should use an instance-specific identifier!
+
+ HANDLE hPipe = CreateNamedPipeA(PipeName.c_str(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES, // Max instance count
+ 65536, // Output buffer size
+ 65536, // Input buffer size
+ 10'000, // Default timeout (ms)
+ m_Outer->PipeSecurityAttributes() // Security attributes
+ );
+
+ if (hPipe == INVALID_HANDLE_VALUE)
+ {
+ spdlog::warn("failed while creating named pipe {}", PipeName.c_str());
+
+ // TODO: error - how to best handle?
+ }
+
+ m_PipeHandle.assign(hPipe); // This now owns the handle and will close it
+
+ m_PipeState = kConnecting;
+
+ asio::windows::overlapped_ptr OverlappedPtr(
+ m_PipeHandle.get_executor(),
+ std::bind(&PipeConnection::OnClientConnect, this, std::placeholders::_1, std::placeholders::_2));
+
+ OVERLAPPED* Overlapped = OverlappedPtr.get();
+ BOOL Ok = ConnectNamedPipe(hPipe, Overlapped);
+ DWORD LastError = GetLastError();
+
+ if (!Ok && LastError != ERROR_IO_PENDING)
+ {
+ m_PipeState = kInvalid;
+
+ // The operation completed immediately, so a completion notification needs
+ // to be posted. When complete() is called, ownership of the OVERLAPPED-
+ // derived object passes to the io_service.
+ std::error_code Ec(LastError, asio::error::get_system_category());
+ OverlappedPtr.complete(Ec, 0);
+ }
+ else
+ {
+ // The operation was successfully initiated, so ownership of the
+ // OVERLAPPED-derived object has now passed to the io_service.
+ OverlappedPtr.release();
+ }
+ }
+
+ private:
+ void OnClientConnect(const std::error_code& Ec, size_t BytesTransferred)
+ {
+ ZEN_UNUSED(BytesTransferred);
+
+ if (Ec)
+ {
+ if (m_PipeState == kDisconnected)
+ {
+ return;
+ }
+
+ spdlog::warn("pipe connection error: {}", Ec.message());
+
+ // TODO: should disconnect and issue a new connect
+ return;
+ }
+
+ spdlog::debug("pipe connection established");
+
+ IssueRead();
+ }
+
+ void IssueRead()
+ {
+ m_PipeState = kReading;
+
+ m_PipeHandle.async_read_some(asio::mutable_buffer(m_MsgBuffer, sizeof m_MsgBuffer),
+ std::bind(&PipeConnection::OnClientRead, this, std::placeholders::_1, std::placeholders::_2));
+ }
+
+ void OnClientRead(const std::error_code& Ec, size_t Bytes)
+ {
+ if (Ec)
+ {
+ if (m_PipeState == kDisconnected)
+ {
+ return;
+ }
+
+ spdlog::warn("pipe read error: {}", Ec.message());
+
+ // TODO: should disconnect and issue a new connect
+ return;
+ }
+
+ spdlog::debug("received message: {} bytes", Bytes);
+
+ // TODO: Actually process request
+
+ m_PipeState = kWriting;
+
+ asio::async_write(m_PipeHandle,
+ asio::buffer(m_MsgBuffer, Bytes),
+ std::bind(&PipeConnection::OnWriteCompletion, this, std::placeholders::_1, std::placeholders::_2));
+ }
+
+ void OnWriteCompletion(const std::error_code& Ec, size_t Bytes)
+ {
+ ZEN_UNUSED(Bytes);
+
+ if (Ec)
+ {
+ if (m_PipeState == kDisconnected)
+ {
+ return;
+ }
+
+ spdlog::warn("pipe write error: {}", Ec.message());
+
+ // TODO: should disconnect and issue a new connect
+ return;
+ }
+
+ // Go back to reading
+ IssueRead();
+ }
+
+ uint8_t m_MsgBuffer[16384];
+ };
+
+ AnyUserSecurityAttributes m_AnyUserSecurityAttributes;
+ std::vector<PipeConnection*> m_ServicePipes;
+ bool m_IsStarted = false;
+};
+
+LocalProjectService::LocalProjectService(CasStore& Store, ProjectStore* Projects) : m_CasStore(Store), m_ProjectStore(Projects)
+{
+ m_Impl = std::make_unique<LocalProjectImpl>();
+ m_Impl->Start();
+}
+
+LocalProjectService::~LocalProjectService()
+{
+ m_Impl->Stop();
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+} // namespace zen
diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h
index 317210469..621d7be05 100644
--- a/zenserver/projectstore.h
+++ b/zenserver/projectstore.h
@@ -12,6 +12,7 @@
#include <tsl/robin_map.h>
#include <filesystem>
#include <map>
+#include <optional>
#include <string>
namespace zen {
@@ -34,6 +35,12 @@ struct OplogEntry
}
};
+struct OplogEntryAddress
+{
+ uint64_t Offset;
+ uint64_t Size;
+};
+
static_assert(IsPow2(sizeof(OplogEntry)));
/** Project Store
@@ -55,7 +62,9 @@ public:
[[nodiscard]] static bool ExistsAt(std::filesystem::path BasePath);
- void IterateFileMap(std::function<void(const Oid&, const std::string_view&)>&& Fn);
+ void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn);
+ void IterateOplog(std::function<void(CbObject)>&& Fn);
+ std::optional<CbObject> GetOplog(const Oid& Key);
IoBuffer FindChunk(Oid ChunkId);
@@ -92,13 +101,9 @@ public:
spdlog::logger& Log() { return m_OuterProject->Log(); }
- private:
- struct OplogEntryAddress
- {
- uint64_t Offset;
- uint64_t Size;
- };
+ std::size_t OplogCount() const { return m_LatestOpMap.size(); }
+ private:
struct FileMapEntry
{
std::string ServerPath;