diff options
| author | Stefan Boberg <[email protected]> | 2023-09-20 15:22:03 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-09-20 15:22:03 +0200 |
| commit | 14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f (patch) | |
| tree | bf24ac15759385cea339f7e1cf5380f984f5699a /src/zenserver | |
| parent | changelog version bump (diff) | |
| download | zen-14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f.tar.xz zen-14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f.zip | |
VFS implementation for local storage service (#396)
currently, only Windows (using Projected File System) is supported
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 25 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.h | 3 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.cpp | 18 | ||||
| -rw-r--r-- | src/zenserver/cache/structuredcachestore.h | 14 | ||||
| -rw-r--r-- | src/zenserver/objectstore/objectstore.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 110 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.h | 17 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsimpl.cpp | 457 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsimpl.h | 100 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsservice.cpp | 217 | ||||
| -rw-r--r-- | src/zenserver/vfs/vfsservice.h | 52 | ||||
| -rw-r--r-- | src/zenserver/xmake.lua | 3 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 7 |
13 files changed, 1013 insertions, 12 deletions
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index 7adf07350..9e6f86d79 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -1620,6 +1620,19 @@ ZenCacheDiskLayer::CacheBucket::GetValueDetails(const std::string_view ValueFilt } void +ZenCacheDiskLayer::CacheBucket::EnumerateBucketContents( + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const +{ + RwLock::SharedLockScope _(m_IndexLock); + for (const auto& It : m_Index) + { + CacheValueDetails::ValueDetails Vd = GetValueDetails(It.first, It.second); + + Fn(It.first, Vd); + } +} + +void ZenCacheDiskLayer::CollectGarbage(GcContext& GcCtx) { ZEN_TRACE_CPU("Z$::Disk::CollectGarbage"); @@ -2130,6 +2143,18 @@ ZenCacheDiskLayer::GetBucketInfo(std::string_view Bucket) const return {}; } +void +ZenCacheDiskLayer::EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const +{ + RwLock::SharedLockScope _(m_Lock); + + if (auto It = m_Buckets.find(std::string(Bucket)); It != m_Buckets.end()) + { + It->second->EnumerateBucketContents(Fn); + } +} + CacheValueDetails::NamespaceDetails ZenCacheDiskLayer::GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const { diff --git a/src/zenserver/cache/cachedisklayer.h b/src/zenserver/cache/cachedisklayer.h index 127e194f1..fc4d8cd6f 100644 --- a/src/zenserver/cache/cachedisklayer.h +++ b/src/zenserver/cache/cachedisklayer.h @@ -124,6 +124,8 @@ public: Info GetInfo() const; std::optional<BucketInfo> GetBucketInfo(std::string_view Bucket) const; + void EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; CacheValueDetails::NamespaceDetails GetValueDetails(const std::string_view BucketFilter, const std::string_view ValueFilter) const; @@ -150,6 +152,7 @@ private: uint64_t EntryCount() const; CacheValueDetails::BucketDetails GetValueDetails(const std::string_view ValueFilter) const; + void EnumerateBucketContents(std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; private: const uint64_t MaxBlockSize = 1ull << 30; diff --git a/src/zenserver/cache/structuredcachestore.cpp b/src/zenserver/cache/structuredcachestore.cpp index c8384d330..4499b05f7 100644 --- a/src/zenserver/cache/structuredcachestore.cpp +++ b/src/zenserver/cache/structuredcachestore.cpp @@ -120,6 +120,13 @@ ZenCacheNamespace::DropBucket(std::string_view Bucket) return AnyDropped; } +void +ZenCacheNamespace::EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const +{ + m_DiskLayer.EnumerateBucketContents(Bucket, Fn); +} + bool ZenCacheNamespace::Drop() { @@ -502,6 +509,17 @@ ZenCacheStore::GetValueDetails(const std::string_view NamespaceFilter, return Details; } +void +ZenCacheStore::EnumerateBucketContents(std::string_view Namespace, + std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn) +{ + if (const ZenCacheNamespace* Ns = FindNamespace(Namespace)) + { + Ns->EnumerateBucketContents(Bucket, Fn); + } +} + ZenCacheNamespace* ZenCacheStore::GetNamespace(std::string_view Namespace) { diff --git a/src/zenserver/cache/structuredcachestore.h b/src/zenserver/cache/structuredcachestore.h index 8c1f995a4..239efe68f 100644 --- a/src/zenserver/cache/structuredcachestore.h +++ b/src/zenserver/cache/structuredcachestore.h @@ -69,10 +69,14 @@ public: ZenCacheNamespace(GcManager& Gc, const std::filesystem::path& RootDir); ~ZenCacheNamespace(); - bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); + + bool DropBucket(std::string_view Bucket); + void EnumerateBucketContents(std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>& Fn) const; + bool Drop(); - bool DropBucket(std::string_view Bucket); void Flush(); uint64_t DiskLayerThreshold() const { return m_DiskLayerSizeThreshold; } @@ -160,6 +164,10 @@ public: std::optional<ZenCacheNamespace::BucketInfo> GetBucketInfo(std::string_view Namespace, std::string_view Bucket); std::vector<std::string> GetNamespaces(); + void EnumerateBucketContents(std::string_view Namespace, + std::string_view Bucket, + std::function<void(const IoHash& Key, const CacheValueDetails::ValueDetails& Details)>&& Fn); + private: const ZenCacheNamespace* FindNamespace(std::string_view Namespace) const; ZenCacheNamespace* GetNamespace(std::string_view Namespace); diff --git a/src/zenserver/objectstore/objectstore.cpp b/src/zenserver/objectstore/objectstore.cpp index e5739418e..3643e8011 100644 --- a/src/zenserver/objectstore/objectstore.cpp +++ b/src/zenserver/objectstore/objectstore.cpp @@ -62,7 +62,7 @@ HttpObjectStoreService::Inititalize() [this](zen::HttpRouterRequest& Request) { const std::string BucketName = Request.GetCapture(1); - StringBuilder<1024> Json; + ExtendableStringBuilder<1024> Json; { CbObjectWriter Writer; Writer.BeginArray("distributions"); diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 9be600e4e..1ad4403f4 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -691,6 +691,45 @@ ProjectStore::Oplog::FindChunk(Oid ChunkId) return {}; } +std::vector<ProjectStore::Oplog::ChunkInfo> +ProjectStore::Oplog::GetAllChunksInfo() +{ + // First just capture all the chunk ids + + std::vector<ChunkInfo> InfoArray; + + { + RwLock::SharedLockScope _(m_OplogLock); + + if (m_Storage) + { + const size_t NumEntries = m_FileMap.size() + m_ChunkMap.size(); + + InfoArray.reserve(NumEntries); + + for (const auto& Kv : m_FileMap) + { + InfoArray.push_back({.ChunkId = Kv.first}); + } + + for (const auto& Kv : m_ChunkMap) + { + InfoArray.push_back({.ChunkId = Kv.first}); + } + } + } + + for (ChunkInfo& Info : InfoArray) + { + if (IoBuffer Chunk = FindChunk(Info.ChunkId)) + { + Info.ChunkSize = Chunk.GetSize(); + } + } + + return InfoArray; +} + void ProjectStore::Oplog::IterateFileMap( std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn) @@ -2056,6 +2095,49 @@ ProjectStore::GetProjectFiles(const std::string_view ProjectId, const std::strin } std::pair<HttpResponseCode, std::string> +ProjectStore::GetProjectChunks(const std::string_view ProjectId, const std::string_view OplogId, CbObject& OutPayload) +{ + ZEN_TRACE_CPU("ProjectStore::GetProjectChunks"); + + using namespace std::literals; + + Ref<ProjectStore::Project> Project = OpenProject(ProjectId); + if (!Project) + { + return {HttpResponseCode::NotFound, fmt::format("unknown project '{}'", ProjectId)}; + } + Project->TouchProject(); + + ProjectStore::Oplog* FoundLog = Project->OpenOplog(OplogId); + if (!FoundLog) + { + return {HttpResponseCode::NotFound, fmt::format("unknown oplog '{}/{}'", ProjectId, OplogId)}; + } + Project->TouchOplog(OplogId); + + std::vector<ProjectStore::Oplog::ChunkInfo> ChunkInfo = FoundLog->GetAllChunksInfo(); + + CbObjectWriter Response; + + Response.BeginArray("chunks"sv); + for (ProjectStore::Oplog::ChunkInfo& Info : ChunkInfo) + { + Response << Info.ChunkId; + } + Response.EndArray(); + + Response.BeginArray("sizes"sv); + for (ProjectStore::Oplog::ChunkInfo& Info : ChunkInfo) + { + Response << Info.ChunkSize; + } + Response.EndArray(); + + OutPayload = Response.Save(); + return {HttpResponseCode::OK, {}}; +} + +std::pair<HttpResponseCode, std::string> ProjectStore::GetChunkInfo(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, @@ -2120,6 +2202,25 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, ZenContentType AcceptType, IoBuffer& OutChunk) { + if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) + { + return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; + } + + const Oid Obj = Oid::FromHexString(ChunkId); + + return GetChunkRange(ProjectId, OplogId, Obj, Offset, Size, AcceptType, OutChunk); +} + +std::pair<HttpResponseCode, std::string> +ProjectStore::GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, + Oid ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk) +{ bool IsOffset = Offset != 0 || Size != ~(0ull); Ref<ProjectStore::Project> Project = OpenProject(ProjectId); @@ -2136,14 +2237,7 @@ ProjectStore::GetChunkRange(const std::string_view ProjectId, } Project->TouchOplog(OplogId); - if (ChunkId.size() != 2 * sizeof(Oid::OidBits)) - { - return {HttpResponseCode::BadRequest, fmt::format("Chunk request for invalid chunk id '{}/{}'/'{}'", ProjectId, OplogId, ChunkId)}; - } - - const Oid Obj = Oid::FromHexString(ChunkId); - - IoBuffer Chunk = FoundLog->FindChunk(Obj); + IoBuffer Chunk = FoundLog->FindChunk(ChunkId); if (!Chunk) { return {HttpResponseCode::NotFound, {}}; diff --git a/src/zenserver/projectstore/projectstore.h b/src/zenserver/projectstore/projectstore.h index aa84d04ca..9c0f8790d 100644 --- a/src/zenserver/projectstore/projectstore.h +++ b/src/zenserver/projectstore/projectstore.h @@ -85,6 +85,13 @@ public: void Write(); void Update(const std::filesystem::path& MarkerPath); + struct ChunkInfo + { + Oid ChunkId; + uint64_t ChunkSize; + }; + + std::vector<ChunkInfo> GetAllChunksInfo(); void IterateFileMap(std::function<void(const Oid&, const std::string_view& ServerPath, const std::string_view& ClientPath)>&& Fn); void IterateOplog(std::function<void(CbObject)>&& Fn); void IterateOplogWithKey(std::function<void(int, const Oid&, CbObject)>&& Fn); @@ -287,12 +294,22 @@ public: const std::string_view OplogId, bool FilterClient, CbObject& OutPayload); + std::pair<HttpResponseCode, std::string> GetProjectChunks(const std::string_view ProjectId, + const std::string_view OplogId, + CbObject& OutPayload); std::pair<HttpResponseCode, std::string> GetChunkInfo(const std::string_view ProjectId, const std::string_view OplogId, const std::string_view ChunkId, CbObject& OutPayload); std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId, const std::string_view OplogId, + const Oid ChunkId, + uint64_t Offset, + uint64_t Size, + ZenContentType AcceptType, + IoBuffer& OutChunk); + std::pair<HttpResponseCode, std::string> GetChunkRange(const std::string_view ProjectId, + const std::string_view OplogId, const std::string_view ChunkId, uint64_t Offset, uint64_t Size, diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp new file mode 100644 index 000000000..f74000900 --- /dev/null +++ b/src/zenserver/vfs/vfsimpl.cpp @@ -0,0 +1,457 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "vfsimpl.h" +#include "vfsservice.h" + +#include "cache/structuredcachestore.h" +#include "projectstore/projectstore.h" + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zenvfs/projfs.h> +#include <zenvfs/vfs.h> + +#include <memory> + +#if ZEN_WITH_VFS + +namespace zen { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +VfsOplogDataSource::VfsOplogDataSource(std::string_view ProjectId, std::string_view OplogId, Ref<ProjectStore> InProjectStore) +: m_ProjectId(ProjectId) +, m_OplogId(OplogId) +, m_ProjectStore(std::move(InProjectStore)) +{ +} + +void +VfsOplogDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount); +} + +void +VfsOplogDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + IoBuffer ChunkBuffer; + auto Result = + m_ProjectStore->GetChunkRange(m_ProjectId, m_OplogId, ChunkId, 0, ~0ull, ZenContentType::kCompressedBinary, /* out */ ChunkBuffer); + + if (Result.first == HttpResponseCode::OK) + { + const uint8_t* SourceBuffer = reinterpret_cast<const uint8_t*>(ChunkBuffer.GetData()); + uint64_t AvailableBufferBytes = ChunkBuffer.GetSize(); + + ZEN_ASSERT(AvailableBufferBytes >= ByteOffset); + AvailableBufferBytes -= ByteOffset; + SourceBuffer += ByteOffset; + + ZEN_ASSERT(AvailableBufferBytes >= ByteCount); + memcpy(Buffer, SourceBuffer, ByteCount); + } +} + +void +VfsOplogDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + // This should never be called + ZEN_UNUSED(NodePath, DirNode); +} + +////////////////////////////////////////////////////////////////////////// + +VfsCacheDataSource::VfsCacheDataSource(std::string_view NamespaceId, std::string_view BucketId, Ref<ZenCacheStore> InCacheStore) +: m_NamespaceId(NamespaceId) +, m_BucketId(BucketId) +, m_CacheStore(std::move(InCacheStore)) +{ +} + +VfsCacheDataSource::~VfsCacheDataSource() +{ +} + +void +VfsCacheDataSource::ReadNamedData(std::string_view Name, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + if (auto DotIndex = Name.find_first_of('.'); DotIndex != std::string_view::npos) + { + Name = Name.substr(0, DotIndex); + } + + IoHash HashKey = IoHash::FromHexString(Name); + + CacheRequestContext CacheContext{}; + + ZenCacheValue Value; + if (m_CacheStore->Get(CacheContext, m_NamespaceId, m_BucketId, HashKey, /* out */ Value)) + { + // TODO bounds check! + auto DataPtr = reinterpret_cast<const uint8_t*>(Value.Value.GetData()) + ByteOffset; + + memcpy(Buffer, DataPtr, ByteCount); + + return; + } +} + +void +VfsCacheDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount); +} + +void +VfsCacheDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + ZEN_UNUSED(NodePath, DirNode); +} + +////////////////////////////////////////////////////////////////////////// + +VfsService::Impl::Impl() +{ +} + +VfsService::Impl::~Impl() +{ + Unmount(); +} + +void +VfsService::Impl::Mount(std::string_view MountPoint) +{ + ZEN_INFO("VFS mount requested at '{}'", MountPoint); + +# if ZEN_PLATFORM_WINDOWS + if (!IsProjFsAvailable()) + { + throw std::runtime_error("Projected File System component not available"); + } +# endif + + if (!m_MountpointPath.empty()) + { + throw std::runtime_error("VFS already mounted"); + } + + m_MountpointPath = MountPoint; + + RefreshVfs(); +} + +void +VfsService::Impl::Unmount() +{ + if (m_MountpointPath.empty()) + { + return; + } + + ZEN_INFO("unmounting VFS from '{}'", m_MountpointPath); + + m_MountpointPath.clear(); + + RefreshVfs(); +} + +void +VfsService::Impl::AddService(Ref<ProjectStore>&& Ps) +{ + m_ProjectStore = std::move(Ps); + + RefreshVfs(); +} + +void +VfsService::Impl::AddService(Ref<ZenCacheStore>&& Z$) +{ + m_ZenCacheStore = std::move(Z$); + + RefreshVfs(); +} + +void +VfsService::Impl::RefreshVfs() +{ + if (m_VfsHost && m_MountpointPath.empty()) + { + m_VfsHost->RequestStop(); + m_VfsThread.join(); + m_VfsHost.reset(); + m_VfsThreadRunning.Reset(); + m_VfsDataSource = nullptr; + + return; + } + + if (!m_VfsHost && !m_MountpointPath.empty()) + { + m_VfsThread = std::thread(&VfsService::Impl::VfsThread, this); + m_VfsThreadRunning.Wait(); + + // At this stage, m_VfsHost should be initialized + + ZEN_ASSERT(m_VfsHost); + } + + if (m_ProjectStore && m_VfsHost) + { + if (!m_VfsDataSource) + { + m_VfsDataSource = new VfsServiceDataSource(this); + } + + m_VfsHost->AddMount("projects"sv, m_VfsDataSource); + } + + if (m_ZenCacheStore && m_VfsHost) + { + if (!m_VfsDataSource) + { + m_VfsDataSource = new VfsServiceDataSource(this); + } + + m_VfsHost->AddMount("ddc_cache"sv, m_VfsDataSource); + } +} + +void +VfsService::Impl::VfsThread() +{ + SetCurrentThreadName("VFS"); + + ZEN_INFO("VFS service thread now RUNNING"); + + try + { + m_VfsHost = std::make_unique<VfsHost>(m_MountpointPath); + m_VfsHost->Initialize(); + + m_VfsThreadRunning.Set(); + m_VfsHost->Run(); + } + catch (std::exception& Ex) + { + ZEN_WARN("exception caught in VFS thread: {}", Ex.what()); + + m_VfsThreadException = std::current_exception(); + } + + if (m_VfsHost) + { + m_VfsHost->Cleanup(); + } + + ZEN_INFO("VFS service thread now EXITING"); +} + +////////////////////////////////////////////////////////////////////////// + +Ref<VfsOplogDataSource> +VfsServiceDataSource::GetOplogDataSource(std::string_view ProjectId, std::string_view OplogId) +{ + ExtendableStringBuilder<256> Key; + Key << ProjectId << "." << OplogId; + std::string StdKey{Key}; + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_OplogSourceMap.find(StdKey); It == m_OplogSourceMap.end()) + { + Ref<VfsOplogDataSource> NewSource{new VfsOplogDataSource(ProjectId, OplogId, m_VfsImpl->m_ProjectStore)}; + m_OplogSourceMap[StdKey] = NewSource; + return NewSource; + } + else + { + return It->second; + } +} + +Ref<VfsCacheDataSource> +VfsServiceDataSource::GetCacheDataSource(std::string_view NamespaceId, std::string_view BucketId) +{ + ExtendableStringBuilder<256> Key; + Key << NamespaceId << "." << BucketId; + std::string StdKey{Key}; + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_CacheSourceMap.find(StdKey); It == m_CacheSourceMap.end()) + { + Ref<VfsCacheDataSource> NewSource{new VfsCacheDataSource(NamespaceId, BucketId, m_VfsImpl->m_ZenCacheStore)}; + m_CacheSourceMap[StdKey] = NewSource; + return NewSource; + } + else + { + return It->second; + } +} + +void +VfsServiceDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount); +} + +void +VfsServiceDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount); +} + +void +VfsServiceDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + if (NodePath == "projects"sv) + { + // Project enumeration + + m_VfsImpl->m_ProjectStore->DiscoverProjects(); + + m_VfsImpl->m_ProjectStore->IterateProjects( + [&](ProjectStore::Project& Project) { DirNode.AddVirtualNode(Project.Identifier, m_VfsImpl->m_VfsDataSource); }); + } + else if (NodePath.starts_with("projects\\"sv)) + { + std::string_view ProjectId{NodePath}; + ProjectId = ProjectId.substr(9); // Skip "projects\" + + if (std::string_view::size_type SlashOffset = ProjectId.find_first_of('\\'); SlashOffset == std::string_view::npos) + { + Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + // No such project found? + + return; + } + + // Oplog enumeration + + std::vector<std::string> Oplogs = Project->ScanForOplogs(); + + for (auto& Oplog : Oplogs) + { + DirNode.AddVirtualNode(Oplog, m_VfsImpl->m_VfsDataSource); + } + } + else + { + std::string_view OplogId = ProjectId.substr(SlashOffset + 1); + ProjectId = ProjectId.substr(0, SlashOffset); + + Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + // No such project found? + + return; + } + + // Oplog contents enumeration + + if (ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId)) + { + Ref<VfsOplogDataSource> DataSource = GetOplogDataSource(ProjectId, OplogId); + + // Get metadata for all chunks + std::vector<ProjectStore::Oplog::ChunkInfo> ChunkInfos = Oplog->GetAllChunksInfo(); + + std::unordered_map<zen::Oid, uint64_t> ChunkSizes; + + for (const auto& Ci : ChunkInfos) + { + ChunkSizes[Ci.ChunkId] = Ci.ChunkSize; + } + + auto EmitFilesForDataArray = [&](zen::CbArrayView DataArray) { + for (auto DataIter : DataArray) + { + if (zen::CbObjectView Data = DataIter.AsObjectView()) + { + std::string_view FileName = Data["filename"sv].AsString(); + zen::Oid ChunkId = Data["id"sv].AsObjectId(); + + if (auto FindIt = ChunkSizes.find(ChunkId); FindIt != ChunkSizes.end()) + { + DirNode.AddFileNode(FileName, FindIt->second /* file size */, ChunkId, DataSource); + } + else + { + ZEN_WARN("no chunk metadata found for chunk {} (file: '{}')", ChunkId, FileName); + } + } + } + }; + + Oplog->IterateOplog([&](CbObject Op) { + EmitFilesForDataArray(Op["packagedata"sv].AsArrayView()); + EmitFilesForDataArray(Op["bulkdata"sv].AsArrayView()); + }); + + DirNode.AddFileNode("stats.json", 42, Oid::Zero); + } + } + } + else if (NodePath == "ddc_cache"sv) + { + // Namespace enumeration + + std::vector<std::string> Namespaces = m_VfsImpl->m_ZenCacheStore->GetNamespaces(); + + for (auto& Namespace : Namespaces) + { + DirNode.AddVirtualNode(Namespace, m_VfsImpl->m_VfsDataSource); + } + } + else if (NodePath.starts_with("ddc_cache\\"sv)) + { + std::string_view NamespaceId{NodePath}; + NamespaceId = NamespaceId.substr(10); // Skip "ddc_cache\" + + auto& Cache = m_VfsImpl->m_ZenCacheStore; + + if (std::string_view::size_type SlashOffset = NamespaceId.find_first_of('\\'); SlashOffset == std::string_view::npos) + { + // Bucket enumeration + + if (auto NsInfo = Cache->GetNamespaceInfo(NamespaceId)) + { + for (auto& BucketName : NsInfo->BucketNames) + { + DirNode.AddVirtualNode(BucketName, m_VfsImpl->m_VfsDataSource); + } + } + } + else + { + // Bucket contents enumeration + + std::string_view BucketId = NamespaceId.substr(SlashOffset + 1); + NamespaceId = NamespaceId.substr(0, SlashOffset); + + Ref<VfsCacheDataSource> DataSource = GetCacheDataSource(NamespaceId, BucketId); + + auto Enumerator = [&](const IoHash& Key, const CacheValueDetails::ValueDetails& Details) { + ExtendableStringBuilder<64> KeyString; + Key.ToHexString(KeyString); + KeyString.Append(".udd"); + DirNode.AddFileNode(KeyString, Details.Size, Oid::Zero, DataSource); + }; + + Cache->EnumerateBucketContents(NamespaceId, BucketId, Enumerator); + } + } +} + +} // namespace zen +#endif diff --git a/src/zenserver/vfs/vfsimpl.h b/src/zenserver/vfs/vfsimpl.h new file mode 100644 index 000000000..9e4fdfe99 --- /dev/null +++ b/src/zenserver/vfs/vfsimpl.h @@ -0,0 +1,100 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "vfsservice.h" + +#include "projectstore/projectstore.h" + +#include <zencore/logging.h> +#include <zenvfs/vfs.h> + +#if ZEN_WITH_VFS + +# include <memory> + +namespace zen { + +struct VfsOplogDataSource : public VfsTreeDataSource +{ + VfsOplogDataSource(std::string_view ProjectId, std::string_view OplogId, Ref<ProjectStore> InProjectStore); + + virtual void ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) override; + +private: + std::string m_ProjectId; + std::string m_OplogId; + Ref<ProjectStore> m_ProjectStore; +}; + +struct VfsCacheDataSource : public VfsTreeDataSource +{ + VfsCacheDataSource(std::string_view NamespaceId, std::string_view BucketId, Ref<ZenCacheStore> InCacheStore); + ~VfsCacheDataSource(); + + virtual void ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) override; + +private: + std::string m_NamespaceId; + std::string m_BucketId; + Ref<ZenCacheStore> m_CacheStore; +}; + +struct VfsServiceDataSource : public VfsTreeDataSource +{ + VfsServiceDataSource(VfsService::Impl* VfsImpl) : m_VfsImpl(VfsImpl) {} + + virtual void ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) override; + virtual void PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) override; + +private: + VfsService::Impl* m_VfsImpl = nullptr; + + RwLock m_Lock; + std::unordered_map<std::string, Ref<VfsOplogDataSource>> m_OplogSourceMap; + std::unordered_map<std::string, Ref<VfsCacheDataSource>> m_CacheSourceMap; + + Ref<VfsOplogDataSource> GetOplogDataSource(std::string_view ProjectId, std::string_view OplogId); + Ref<VfsCacheDataSource> GetCacheDataSource(std::string_view NamespaceId, std::string_view BucketId); +}; + +////////////////////////////////////////////////////////////////////////// + +struct VfsService::Impl +{ + Impl(); + ~Impl(); + + void Mount(std::string_view MountPoint); + void Unmount(); + void AddService(Ref<ProjectStore>&&); + void AddService(Ref<ZenCacheStore>&&); + + inline std::string GetMountpointPath() { return m_MountpointPath; } + inline bool IsVfsRunning() const { return !!m_VfsHost.get(); } + +private: + Ref<ProjectStore> m_ProjectStore; + Ref<ZenCacheStore> m_ZenCacheStore; + Ref<VfsServiceDataSource> m_VfsDataSource; + std::string m_MountpointPath; + + std::unique_ptr<VfsHost> m_VfsHost; + std::thread m_VfsThread; + Event m_VfsThreadRunning; + std::exception_ptr m_VfsThreadException; + + void RefreshVfs(); + void VfsThread(); + + friend struct VfsServiceDataSource; +}; + +} // namespace zen + +#endif diff --git a/src/zenserver/vfs/vfsservice.cpp b/src/zenserver/vfs/vfsservice.cpp new file mode 100644 index 000000000..c53682d93 --- /dev/null +++ b/src/zenserver/vfs/vfsservice.cpp @@ -0,0 +1,217 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "vfsservice.h" +#include "vfsimpl.h" + +#include <zencore/compactbinarybuilder.h> + +namespace zen { + +using namespace std::literals; + +#if ZEN_WITH_VFS + +////////////////////////////////////////////////////////////////////////// + +bool +GetContentAsCbObject(HttpServerRequest& HttpReq, CbObject& Cb) +{ + IoBuffer Payload = HttpReq.ReadPayload(); + HttpContentType PayloadContentType = HttpReq.RequestContentType(); + + switch (PayloadContentType) + { + case HttpContentType::kJSON: + case HttpContentType::kUnknownContentType: + case HttpContentType::kText: + { + std::string JsonText(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); + Cb = LoadCompactBinaryFromJson(JsonText).AsObject(); + if (!Cb) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected JSON format"); + return false; + } + } + break; + case HttpContentType::kCbObject: + Cb = LoadCompactBinaryObject(Payload); + if (!Cb) + { + HttpReq.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + "Content format not supported, expected compact binary format"); + return false; + } + break; + default: + HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid request content type"); + return false; + } + + return true; +} + +////////////////////////////////////////////////////////////////////////// +// +// to test: +// +// echo {"method": "mount", "params": {"path": "d:\\VFS_ROOT"}} | curl.exe http://localhost:1337/vfs --data-binary @- +// echo {"method": "unmount"} | curl.exe http://localhost:1337/vfs --data-binary @- + +VfsService::VfsService() +{ + m_Impl = new Impl; + + m_Router.RegisterRoute( + "info", + [&](HttpRouterRequest& Request) { + CbObjectWriter Cbo; + Cbo << "running" << m_Impl->IsVfsRunning(); + Cbo << "rootpath" << m_Impl->GetMountpointPath(); + + Request.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet | HttpVerb::kHead); + + m_Router.RegisterRoute( + "", + [&](HttpRouterRequest& Req) { + CbObject Payload; + + if (!GetContentAsCbObject(Req.ServerRequest(), Payload)) + return; + + std::string_view RpcName = Payload["method"sv].AsString(); + + if (RpcName == "mount"sv) + { + CbObjectView Params = Payload["params"sv].AsObjectView(); + std::string_view Mountpath = Params["path"sv].AsString(); + + if (Mountpath.empty()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "no path specified"); + } + + if (m_Impl->IsVfsRunning()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "VFS already mounted"); + } + + try + { + m_Impl->Mount(Mountpath); + } + catch (std::exception& Ex) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, Ex.what()); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + else if (RpcName == "unmount"sv) + { + if (!m_Impl->IsVfsRunning()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "VFS not active"); + } + + try + { + m_Impl->Unmount(); + } + catch (std::exception& Ex) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, Ex.what()); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::OK); + } + else + { + Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "unknown RPC"sv); + } + }, + HttpVerb::kPost); +} + +VfsService::~VfsService() +{ + delete m_Impl; +} + +void +VfsService::Mount(std::string_view MountPoint) +{ + m_Impl->Mount(MountPoint); +} + +void +VfsService::Unmount() +{ + m_Impl->Unmount(); +} + +void +VfsService::AddService(Ref<ProjectStore>&& Ps) +{ + m_Impl->AddService(std::move(Ps)); +} + +void +VfsService::AddService(Ref<ZenCacheStore>&& Z$) +{ + m_Impl->AddService(std::move(Z$)); +} + +#else + +VfsService::VfsService() +{ +} + +VfsService::~VfsService() +{ +} + +void +VfsService::Mount(std::string_view MountPoint) +{ + ZEN_UNUSED(MountPoint); +} + +void +VfsService::Unmount() +{ +} + +void +VfsService::AddService(Ref<ProjectStore>&& Ps) +{ + ZEN_UNUSED(Ps); +} + +void +VfsService::AddService(Ref<ZenCacheStore>&& Z$) +{ + ZEN_UNUSED(Z$); +} + +#endif + +const char* +VfsService::BaseUri() const +{ + return "/vfs/"; +} + +void +VfsService::HandleRequest(HttpServerRequest& HttpServiceRequest) +{ + m_Router.HandleRequest(HttpServiceRequest); +} + +} // namespace zen diff --git a/src/zenserver/vfs/vfsservice.h b/src/zenserver/vfs/vfsservice.h new file mode 100644 index 000000000..9510cfcda --- /dev/null +++ b/src/zenserver/vfs/vfsservice.h @@ -0,0 +1,52 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/refcount.h> +#include <zenhttp/httpserver.h> +#include <zenvfs/vfs.h> + +#include <memory> + +namespace zen { + +class ProjectStore; +class ZenCacheStore; + +/** Virtual File System service + + Implements support for exposing data via a virtual file system interface. Currently + this is primarily used to surface various data stored in the local storage service + to users for debugging and exploration purposes. + + Currently, it surfaces information from the structured cache service and from the + project store. + + */ + +class VfsService : public HttpService +{ +public: + VfsService(); + ~VfsService(); + + void Mount(std::string_view MountPoint); + void Unmount(); + + void AddService(Ref<ProjectStore>&&); + void AddService(Ref<ZenCacheStore>&&); + +protected: + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& HttpServiceRequest) override; + +private: + struct Impl; + Impl* m_Impl = nullptr; + + HttpRequestRouter m_Router; + + friend struct VfsServiceDataSource; +}; + +} // namespace zen diff --git a/src/zenserver/xmake.lua b/src/zenserver/xmake.lua index a2d02baae..66bfe4ada 100644 --- a/src/zenserver/xmake.lua +++ b/src/zenserver/xmake.lua @@ -3,6 +3,7 @@ target("zenserver") set_kind("binary") add_deps("zencore", "zenhttp", "zenstore", "zenutil") + add_deps("zenvfs") add_headerfiles("**.h") add_files("**.cpp") add_files("zenserver.cpp", {unity_ignored = true }) @@ -19,6 +20,8 @@ target("zenserver") add_ldflags("/LTCG") add_files("zenserver.rc") add_cxxflags("/bigobj") + add_links("delayimp", "projectedfslib") + add_ldflags("/delayload:ProjectedFSLib.dll") else remove_files("windows/**") end diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 988f72273..1f37e336f 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -125,6 +125,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #include "projectstore/httpprojectstore.h" #include "projectstore/projectstore.h" #include "upstream/upstream.h" +#include "vfs/vfsservice.h" #define ZEN_APP_NAME "Zen store" @@ -419,6 +420,11 @@ public: m_Http->RegisterService(*m_ObjStoreService); } + m_VfsService = std::make_unique<VfsService>(); + m_VfsService->AddService(Ref<ProjectStore>(m_ProjectStore)); + m_VfsService->AddService(Ref<ZenCacheStore>(m_CacheStore)); + m_Http->RegisterService(*m_VfsService); + ZEN_INFO("initializing GC, enabled '{}', interval {}s", ServerOptions.GcConfig.Enabled, ServerOptions.GcConfig.IntervalSeconds); zen::GcSchedulerConfig GcConfig{ .RootDirectory = m_DataRoot / "gc", @@ -750,6 +756,7 @@ private: #endif // ZEN_WITH_COMPUTE_SERVICES std::unique_ptr<zen::HttpFrontendService> m_FrontendService; std::unique_ptr<zen::HttpObjectStoreService> m_ObjStoreService; + std::unique_ptr<zen::VfsService> m_VfsService; std::unique_ptr<JobQueue> m_JobQueue; std::unique_ptr<zen::HttpAdminService> m_AdminService; |