aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/vfs/vfsimpl.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-09-20 15:22:03 +0200
committerGitHub <[email protected]>2023-09-20 15:22:03 +0200
commit14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f (patch)
treebf24ac15759385cea339f7e1cf5380f984f5699a /src/zenserver/vfs/vfsimpl.cpp
parentchangelog version bump (diff)
downloadzen-14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f.tar.xz
zen-14d7568f9c7d970b7bbf7b6463a0a8530f98bb6f.zip
VFS implementation for local storage service (#396)
currently, only Windows (using Projected File System) is supported
Diffstat (limited to 'src/zenserver/vfs/vfsimpl.cpp')
-rw-r--r--src/zenserver/vfs/vfsimpl.cpp457
1 files changed, 457 insertions, 0 deletions
diff --git a/src/zenserver/vfs/vfsimpl.cpp b/src/zenserver/vfs/vfsimpl.cpp
new file mode 100644
index 000000000..f74000900
--- /dev/null
+++ b/src/zenserver/vfs/vfsimpl.cpp
@@ -0,0 +1,457 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "vfsimpl.h"
+#include "vfsservice.h"
+
+#include "cache/structuredcachestore.h"
+#include "projectstore/projectstore.h"
+
+#include <zencore/fmtutils.h>
+#include <zencore/logging.h>
+#include <zenvfs/projfs.h>
+#include <zenvfs/vfs.h>
+
+#include <memory>
+
+#if ZEN_WITH_VFS
+
+namespace zen {
+
+using namespace std::literals;
+
+//////////////////////////////////////////////////////////////////////////
+
+VfsOplogDataSource::VfsOplogDataSource(std::string_view ProjectId, std::string_view OplogId, Ref<ProjectStore> InProjectStore)
+: m_ProjectId(ProjectId)
+, m_OplogId(OplogId)
+, m_ProjectStore(std::move(InProjectStore))
+{
+}
+
+void
+VfsOplogDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount);
+}
+
+void
+VfsOplogDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ IoBuffer ChunkBuffer;
+ auto Result =
+ m_ProjectStore->GetChunkRange(m_ProjectId, m_OplogId, ChunkId, 0, ~0ull, ZenContentType::kCompressedBinary, /* out */ ChunkBuffer);
+
+ if (Result.first == HttpResponseCode::OK)
+ {
+ const uint8_t* SourceBuffer = reinterpret_cast<const uint8_t*>(ChunkBuffer.GetData());
+ uint64_t AvailableBufferBytes = ChunkBuffer.GetSize();
+
+ ZEN_ASSERT(AvailableBufferBytes >= ByteOffset);
+ AvailableBufferBytes -= ByteOffset;
+ SourceBuffer += ByteOffset;
+
+ ZEN_ASSERT(AvailableBufferBytes >= ByteCount);
+ memcpy(Buffer, SourceBuffer, ByteCount);
+ }
+}
+
+void
+VfsOplogDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode)
+{
+ // This should never be called
+ ZEN_UNUSED(NodePath, DirNode);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+VfsCacheDataSource::VfsCacheDataSource(std::string_view NamespaceId, std::string_view BucketId, Ref<ZenCacheStore> InCacheStore)
+: m_NamespaceId(NamespaceId)
+, m_BucketId(BucketId)
+, m_CacheStore(std::move(InCacheStore))
+{
+}
+
+VfsCacheDataSource::~VfsCacheDataSource()
+{
+}
+
+void
+VfsCacheDataSource::ReadNamedData(std::string_view Name, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ if (auto DotIndex = Name.find_first_of('.'); DotIndex != std::string_view::npos)
+ {
+ Name = Name.substr(0, DotIndex);
+ }
+
+ IoHash HashKey = IoHash::FromHexString(Name);
+
+ CacheRequestContext CacheContext{};
+
+ ZenCacheValue Value;
+ if (m_CacheStore->Get(CacheContext, m_NamespaceId, m_BucketId, HashKey, /* out */ Value))
+ {
+ // TODO bounds check!
+ auto DataPtr = reinterpret_cast<const uint8_t*>(Value.Value.GetData()) + ByteOffset;
+
+ memcpy(Buffer, DataPtr, ByteCount);
+
+ return;
+ }
+}
+
+void
+VfsCacheDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount);
+}
+
+void
+VfsCacheDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode)
+{
+ ZEN_UNUSED(NodePath, DirNode);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+VfsService::Impl::Impl()
+{
+}
+
+VfsService::Impl::~Impl()
+{
+ Unmount();
+}
+
+void
+VfsService::Impl::Mount(std::string_view MountPoint)
+{
+ ZEN_INFO("VFS mount requested at '{}'", MountPoint);
+
+# if ZEN_PLATFORM_WINDOWS
+ if (!IsProjFsAvailable())
+ {
+ throw std::runtime_error("Projected File System component not available");
+ }
+# endif
+
+ if (!m_MountpointPath.empty())
+ {
+ throw std::runtime_error("VFS already mounted");
+ }
+
+ m_MountpointPath = MountPoint;
+
+ RefreshVfs();
+}
+
+void
+VfsService::Impl::Unmount()
+{
+ if (m_MountpointPath.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("unmounting VFS from '{}'", m_MountpointPath);
+
+ m_MountpointPath.clear();
+
+ RefreshVfs();
+}
+
+void
+VfsService::Impl::AddService(Ref<ProjectStore>&& Ps)
+{
+ m_ProjectStore = std::move(Ps);
+
+ RefreshVfs();
+}
+
+void
+VfsService::Impl::AddService(Ref<ZenCacheStore>&& Z$)
+{
+ m_ZenCacheStore = std::move(Z$);
+
+ RefreshVfs();
+}
+
+void
+VfsService::Impl::RefreshVfs()
+{
+ if (m_VfsHost && m_MountpointPath.empty())
+ {
+ m_VfsHost->RequestStop();
+ m_VfsThread.join();
+ m_VfsHost.reset();
+ m_VfsThreadRunning.Reset();
+ m_VfsDataSource = nullptr;
+
+ return;
+ }
+
+ if (!m_VfsHost && !m_MountpointPath.empty())
+ {
+ m_VfsThread = std::thread(&VfsService::Impl::VfsThread, this);
+ m_VfsThreadRunning.Wait();
+
+ // At this stage, m_VfsHost should be initialized
+
+ ZEN_ASSERT(m_VfsHost);
+ }
+
+ if (m_ProjectStore && m_VfsHost)
+ {
+ if (!m_VfsDataSource)
+ {
+ m_VfsDataSource = new VfsServiceDataSource(this);
+ }
+
+ m_VfsHost->AddMount("projects"sv, m_VfsDataSource);
+ }
+
+ if (m_ZenCacheStore && m_VfsHost)
+ {
+ if (!m_VfsDataSource)
+ {
+ m_VfsDataSource = new VfsServiceDataSource(this);
+ }
+
+ m_VfsHost->AddMount("ddc_cache"sv, m_VfsDataSource);
+ }
+}
+
+void
+VfsService::Impl::VfsThread()
+{
+ SetCurrentThreadName("VFS");
+
+ ZEN_INFO("VFS service thread now RUNNING");
+
+ try
+ {
+ m_VfsHost = std::make_unique<VfsHost>(m_MountpointPath);
+ m_VfsHost->Initialize();
+
+ m_VfsThreadRunning.Set();
+ m_VfsHost->Run();
+ }
+ catch (std::exception& Ex)
+ {
+ ZEN_WARN("exception caught in VFS thread: {}", Ex.what());
+
+ m_VfsThreadException = std::current_exception();
+ }
+
+ if (m_VfsHost)
+ {
+ m_VfsHost->Cleanup();
+ }
+
+ ZEN_INFO("VFS service thread now EXITING");
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+Ref<VfsOplogDataSource>
+VfsServiceDataSource::GetOplogDataSource(std::string_view ProjectId, std::string_view OplogId)
+{
+ ExtendableStringBuilder<256> Key;
+ Key << ProjectId << "." << OplogId;
+ std::string StdKey{Key};
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_OplogSourceMap.find(StdKey); It == m_OplogSourceMap.end())
+ {
+ Ref<VfsOplogDataSource> NewSource{new VfsOplogDataSource(ProjectId, OplogId, m_VfsImpl->m_ProjectStore)};
+ m_OplogSourceMap[StdKey] = NewSource;
+ return NewSource;
+ }
+ else
+ {
+ return It->second;
+ }
+}
+
+Ref<VfsCacheDataSource>
+VfsServiceDataSource::GetCacheDataSource(std::string_view NamespaceId, std::string_view BucketId)
+{
+ ExtendableStringBuilder<256> Key;
+ Key << NamespaceId << "." << BucketId;
+ std::string StdKey{Key};
+
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (auto It = m_CacheSourceMap.find(StdKey); It == m_CacheSourceMap.end())
+ {
+ Ref<VfsCacheDataSource> NewSource{new VfsCacheDataSource(NamespaceId, BucketId, m_VfsImpl->m_ZenCacheStore)};
+ m_CacheSourceMap[StdKey] = NewSource;
+ return NewSource;
+ }
+ else
+ {
+ return It->second;
+ }
+}
+
+void
+VfsServiceDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount);
+}
+
+void
+VfsServiceDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount)
+{
+ ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount);
+}
+
+void
+VfsServiceDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode)
+{
+ if (NodePath == "projects"sv)
+ {
+ // Project enumeration
+
+ m_VfsImpl->m_ProjectStore->DiscoverProjects();
+
+ m_VfsImpl->m_ProjectStore->IterateProjects(
+ [&](ProjectStore::Project& Project) { DirNode.AddVirtualNode(Project.Identifier, m_VfsImpl->m_VfsDataSource); });
+ }
+ else if (NodePath.starts_with("projects\\"sv))
+ {
+ std::string_view ProjectId{NodePath};
+ ProjectId = ProjectId.substr(9); // Skip "projects\"
+
+ if (std::string_view::size_type SlashOffset = ProjectId.find_first_of('\\'); SlashOffset == std::string_view::npos)
+ {
+ Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ // No such project found?
+
+ return;
+ }
+
+ // Oplog enumeration
+
+ std::vector<std::string> Oplogs = Project->ScanForOplogs();
+
+ for (auto& Oplog : Oplogs)
+ {
+ DirNode.AddVirtualNode(Oplog, m_VfsImpl->m_VfsDataSource);
+ }
+ }
+ else
+ {
+ std::string_view OplogId = ProjectId.substr(SlashOffset + 1);
+ ProjectId = ProjectId.substr(0, SlashOffset);
+
+ Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId);
+
+ if (!Project)
+ {
+ // No such project found?
+
+ return;
+ }
+
+ // Oplog contents enumeration
+
+ if (ProjectStore::Oplog* Oplog = Project->OpenOplog(OplogId))
+ {
+ Ref<VfsOplogDataSource> DataSource = GetOplogDataSource(ProjectId, OplogId);
+
+ // Get metadata for all chunks
+ std::vector<ProjectStore::Oplog::ChunkInfo> ChunkInfos = Oplog->GetAllChunksInfo();
+
+ std::unordered_map<zen::Oid, uint64_t> ChunkSizes;
+
+ for (const auto& Ci : ChunkInfos)
+ {
+ ChunkSizes[Ci.ChunkId] = Ci.ChunkSize;
+ }
+
+ auto EmitFilesForDataArray = [&](zen::CbArrayView DataArray) {
+ for (auto DataIter : DataArray)
+ {
+ if (zen::CbObjectView Data = DataIter.AsObjectView())
+ {
+ std::string_view FileName = Data["filename"sv].AsString();
+ zen::Oid ChunkId = Data["id"sv].AsObjectId();
+
+ if (auto FindIt = ChunkSizes.find(ChunkId); FindIt != ChunkSizes.end())
+ {
+ DirNode.AddFileNode(FileName, FindIt->second /* file size */, ChunkId, DataSource);
+ }
+ else
+ {
+ ZEN_WARN("no chunk metadata found for chunk {} (file: '{}')", ChunkId, FileName);
+ }
+ }
+ }
+ };
+
+ Oplog->IterateOplog([&](CbObject Op) {
+ EmitFilesForDataArray(Op["packagedata"sv].AsArrayView());
+ EmitFilesForDataArray(Op["bulkdata"sv].AsArrayView());
+ });
+
+ DirNode.AddFileNode("stats.json", 42, Oid::Zero);
+ }
+ }
+ }
+ else if (NodePath == "ddc_cache"sv)
+ {
+ // Namespace enumeration
+
+ std::vector<std::string> Namespaces = m_VfsImpl->m_ZenCacheStore->GetNamespaces();
+
+ for (auto& Namespace : Namespaces)
+ {
+ DirNode.AddVirtualNode(Namespace, m_VfsImpl->m_VfsDataSource);
+ }
+ }
+ else if (NodePath.starts_with("ddc_cache\\"sv))
+ {
+ std::string_view NamespaceId{NodePath};
+ NamespaceId = NamespaceId.substr(10); // Skip "ddc_cache\"
+
+ auto& Cache = m_VfsImpl->m_ZenCacheStore;
+
+ if (std::string_view::size_type SlashOffset = NamespaceId.find_first_of('\\'); SlashOffset == std::string_view::npos)
+ {
+ // Bucket enumeration
+
+ if (auto NsInfo = Cache->GetNamespaceInfo(NamespaceId))
+ {
+ for (auto& BucketName : NsInfo->BucketNames)
+ {
+ DirNode.AddVirtualNode(BucketName, m_VfsImpl->m_VfsDataSource);
+ }
+ }
+ }
+ else
+ {
+ // Bucket contents enumeration
+
+ std::string_view BucketId = NamespaceId.substr(SlashOffset + 1);
+ NamespaceId = NamespaceId.substr(0, SlashOffset);
+
+ Ref<VfsCacheDataSource> DataSource = GetCacheDataSource(NamespaceId, BucketId);
+
+ auto Enumerator = [&](const IoHash& Key, const CacheValueDetails::ValueDetails& Details) {
+ ExtendableStringBuilder<64> KeyString;
+ Key.ToHexString(KeyString);
+ KeyString.Append(".udd");
+ DirNode.AddFileNode(KeyString, Details.Size, Oid::Zero, DataSource);
+ };
+
+ Cache->EnumerateBucketContents(NamespaceId, BucketId, Enumerator);
+ }
+ }
+}
+
+} // namespace zen
+#endif