diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 14:17:07 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 14:17:07 +0200 |
| commit | 158d4d50392ff1d416828ca2a17d4c3f4b877bd9 (patch) | |
| tree | 2c4a84bd3c6e9bce22567895a6e213a6e43bcd48 /src/zenstore/vfsimpl.cpp | |
| parent | fix missing chunk (#548) (diff) | |
| download | zen-158d4d50392ff1d416828ca2a17d4c3f4b877bd9.tar.xz zen-158d4d50392ff1d416828ca2a17d4c3f4b877bd9.zip | |
move zen vfs implementation to zenstore (#549)
* move zen vfs implementation to zenstore
Diffstat (limited to 'src/zenstore/vfsimpl.cpp')
| -rw-r--r-- | src/zenstore/vfsimpl.cpp | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/src/zenstore/vfsimpl.cpp b/src/zenstore/vfsimpl.cpp new file mode 100644 index 000000000..0a918d452 --- /dev/null +++ b/src/zenstore/vfsimpl.cpp @@ -0,0 +1,450 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenstore/vfsimpl.h> + +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zenstore/cache/structuredcachestore.h> +#include <zenstore/projectstore.h> +#include <zenvfs/projfs.h> +#include <zenvfs/vfs.h> + +#include <memory> +#include <unordered_map> + +#if ZEN_WITH_VFS + +namespace zen { + +using namespace std::literals; + +////////////////////////////////////////////////////////////////////////// + +VfsOplogDataSource::VfsOplogDataSource(std::string_view ProjectId, std::string_view OplogId, Ref<ProjectStore> InProjectStore) +: m_ProjectId(ProjectId) +, m_OplogId(OplogId) +, m_ProjectStore(std::move(InProjectStore)) +{ +} + +void +VfsOplogDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount); +} + +void +VfsOplogDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + IoBuffer ChunkBuffer = m_ProjectStore->GetChunk(m_ProjectId, m_OplogId, ChunkId); + if (ChunkBuffer) + { + ZEN_ASSERT(ChunkBuffer.GetSize() >= ByteOffset); + ZEN_ASSERT(ChunkBuffer.GetSize() - ByteOffset >= ByteCount); + MutableMemoryView Target(Buffer, ByteCount); + Target.CopyFrom(ChunkBuffer.GetView().Mid(ByteOffset, ByteCount)); + } +} + +void +VfsOplogDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + // This should never be called + ZEN_UNUSED(NodePath, DirNode); +} + +////////////////////////////////////////////////////////////////////////// + +VfsCacheDataSource::VfsCacheDataSource(std::string_view NamespaceId, std::string_view BucketId, Ref<ZenCacheStore> InCacheStore) +: m_NamespaceId(NamespaceId) +, m_BucketId(BucketId) +, m_CacheStore(std::move(InCacheStore)) +{ +} + +VfsCacheDataSource::~VfsCacheDataSource() +{ +} + +void +VfsCacheDataSource::ReadNamedData(std::string_view Name, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + if (auto DotIndex = Name.find_first_of('.'); DotIndex != std::string_view::npos) + { + Name = Name.substr(0, DotIndex); + } + + IoHash HashKey = IoHash::FromHexString(Name); + + CacheRequestContext CacheContext{}; + + ZenCacheValue Value; + if (m_CacheStore->Get(CacheContext, m_NamespaceId, m_BucketId, HashKey, /* out */ Value)) + { + // TODO bounds check! + auto DataPtr = reinterpret_cast<const uint8_t*>(Value.Value.GetData()) + ByteOffset; + + memcpy(Buffer, DataPtr, ByteCount); + + return; + } +} + +void +VfsCacheDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount); +} + +void +VfsCacheDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + ZEN_UNUSED(NodePath, DirNode); +} + +////////////////////////////////////////////////////////////////////////// + +VfsServiceImpl::VfsServiceImpl() +{ +} + +VfsServiceImpl::~VfsServiceImpl() +{ + Unmount(); +} + +void +VfsServiceImpl::Mount(std::string_view MountPoint) +{ + ZEN_INFO("VFS mount requested at '{}'", MountPoint); + +# if ZEN_PLATFORM_WINDOWS + if (!IsProjFsAvailable()) + { + throw std::runtime_error("Projected File System component not available"); + } +# endif + + if (!m_MountpointPath.empty()) + { + throw std::runtime_error("VFS already mounted"); + } + + m_MountpointPath = MountPoint; + + RefreshVfs(); +} + +void +VfsServiceImpl::Unmount() +{ + if (m_MountpointPath.empty()) + { + return; + } + + ZEN_INFO("unmounting VFS from '{}'", m_MountpointPath); + + m_MountpointPath.clear(); + + RefreshVfs(); +} + +void +VfsServiceImpl::AddService(Ref<ProjectStore>&& Ps) +{ + m_ProjectStore = std::move(Ps); + + RefreshVfs(); +} + +void +VfsServiceImpl::AddService(Ref<ZenCacheStore>&& Z$) +{ + m_ZenCacheStore = std::move(Z$); + + RefreshVfs(); +} + +void +VfsServiceImpl::RefreshVfs() +{ + if (m_VfsHost && m_MountpointPath.empty()) + { + m_VfsHost->RequestStop(); + m_VfsThread.join(); + m_VfsHost.reset(); + m_VfsThreadRunning.Reset(); + m_VfsDataSource = nullptr; + + return; + } + + if (!m_VfsHost && !m_MountpointPath.empty()) + { + m_VfsThread = std::thread(&VfsServiceImpl::VfsThread, this); + m_VfsThreadRunning.Wait(); + + // At this stage, m_VfsHost should be initialized + + ZEN_ASSERT(m_VfsHost); + } + + if (m_ProjectStore && m_VfsHost) + { + if (!m_VfsDataSource) + { + m_VfsDataSource = new VfsServiceDataSource(this); + } + + m_VfsHost->AddMount("projects"sv, m_VfsDataSource); + } + + if (m_ZenCacheStore && m_VfsHost) + { + if (!m_VfsDataSource) + { + m_VfsDataSource = new VfsServiceDataSource(this); + } + + m_VfsHost->AddMount("ddc_cache"sv, m_VfsDataSource); + } +} + +void +VfsServiceImpl::VfsThread() +{ + SetCurrentThreadName("VFS"); + + ZEN_INFO("VFS service thread now RUNNING"); + + try + { + m_VfsHost = std::make_unique<VfsHost>(m_MountpointPath); + m_VfsHost->Initialize(); + + m_VfsThreadRunning.Set(); + m_VfsHost->Run(); + } + catch (const std::exception& Ex) + { + ZEN_WARN("exception caught in VFS thread: {}", Ex.what()); + + m_VfsThreadException = std::current_exception(); + } + + if (m_VfsHost) + { + m_VfsHost->Cleanup(); + } + + ZEN_INFO("VFS service thread now EXITING"); +} + +////////////////////////////////////////////////////////////////////////// + +Ref<VfsOplogDataSource> +VfsServiceDataSource::GetOplogDataSource(std::string_view ProjectId, std::string_view OplogId) +{ + ExtendableStringBuilder<256> Key; + Key << ProjectId << "." << OplogId; + std::string StdKey{Key}; + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_OplogSourceMap.find(StdKey); It == m_OplogSourceMap.end()) + { + Ref<VfsOplogDataSource> NewSource{new VfsOplogDataSource(ProjectId, OplogId, m_VfsImpl->m_ProjectStore)}; + m_OplogSourceMap[StdKey] = NewSource; + return NewSource; + } + else + { + return It->second; + } +} + +Ref<VfsCacheDataSource> +VfsServiceDataSource::GetCacheDataSource(std::string_view NamespaceId, std::string_view BucketId) +{ + ExtendableStringBuilder<256> Key; + Key << NamespaceId << "." << BucketId; + std::string StdKey{Key}; + + RwLock::ExclusiveLockScope _(m_Lock); + + if (auto It = m_CacheSourceMap.find(StdKey); It == m_CacheSourceMap.end()) + { + Ref<VfsCacheDataSource> NewSource{new VfsCacheDataSource(NamespaceId, BucketId, m_VfsImpl->m_ZenCacheStore)}; + m_CacheSourceMap[StdKey] = NewSource; + return NewSource; + } + else + { + return It->second; + } +} + +void +VfsServiceDataSource::ReadNamedData(std::string_view Path, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(Path, Buffer, ByteOffset, ByteCount); +} + +void +VfsServiceDataSource::ReadChunkData(const Oid& ChunkId, void* Buffer, uint64_t ByteOffset, uint64_t ByteCount) +{ + ZEN_UNUSED(ChunkId, Buffer, ByteOffset, ByteCount); +} + +void +VfsServiceDataSource::PopulateDirectory(std::string NodePath, VfsTreeNode& DirNode) +{ + if (NodePath == "projects"sv) + { + // Project enumeration + + m_VfsImpl->m_ProjectStore->DiscoverProjects(); + + m_VfsImpl->m_ProjectStore->IterateProjects( + [&](ProjectStore::Project& Project) { DirNode.AddVirtualNode(Project.Identifier, m_VfsImpl->m_VfsDataSource); }); + } + else if (NodePath.starts_with("projects\\"sv)) + { + std::string_view ProjectId{NodePath}; + ProjectId = ProjectId.substr(9); // Skip "projects\" + + if (std::string_view::size_type SlashOffset = ProjectId.find_first_of('\\'); SlashOffset == std::string_view::npos) + { + Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + // No such project found? + + return; + } + + // Oplog enumeration + + std::vector<std::string> Oplogs = Project->ScanForOplogs(); + + for (auto& Oplog : Oplogs) + { + DirNode.AddVirtualNode(Oplog, m_VfsImpl->m_VfsDataSource); + } + } + else + { + std::string_view OplogId = ProjectId.substr(SlashOffset + 1); + ProjectId = ProjectId.substr(0, SlashOffset); + + Ref<ProjectStore::Project> Project = m_VfsImpl->m_ProjectStore->OpenProject(ProjectId); + + if (!Project) + { + // No such project found? + + return; + } + + // Oplog contents enumeration + + if (Ref<ProjectStore::Oplog> Oplog = Project->OpenOplog(OplogId, /*AllowCompact*/ false, /*VerifyPathOnDisk*/ true)) + { + Ref<VfsOplogDataSource> DataSource = GetOplogDataSource(ProjectId, OplogId); + + // Get metadata for all chunks + std::vector<ProjectStore::Oplog::ChunkInfo> ChunkInfos = Oplog->GetAllChunksInfo(Project->RootDir); + + std::unordered_map<zen::Oid, uint64_t> ChunkSizes; + + for (const auto& Ci : ChunkInfos) + { + ChunkSizes[Ci.ChunkId] = Ci.ChunkSize; + } + + auto EmitFilesForDataArray = [&](zen::CbArrayView DataArray) { + for (auto DataIter : DataArray) + { + if (zen::CbObjectView Data = DataIter.AsObjectView()) + { + std::filesystem::path FileName(Data["filename"sv].AsU8String()); + zen::Oid ChunkId = Data["id"sv].AsObjectId(); + + if (auto FindIt = ChunkSizes.find(ChunkId); FindIt != ChunkSizes.end()) + { + DirNode.AddFileNode(FileName.string(), FindIt->second /* file size */, ChunkId, DataSource); + } + else + { + ZEN_WARN("no chunk metadata found for chunk {} (file: '{}')", ChunkId, FileName); + } + } + } + }; + + Oplog->IterateOplog( + [&](CbObjectView Op) { + EmitFilesForDataArray(Op["packagedata"sv].AsArrayView()); + EmitFilesForDataArray(Op["bulkdata"sv].AsArrayView()); + }, + ProjectStore::Oplog::Paging{}); + + DirNode.AddFileNode("stats.json", 42, Oid::Zero); + } + } + } + else if (NodePath == "ddc_cache"sv) + { + // Namespace enumeration + + std::vector<std::string> Namespaces = m_VfsImpl->m_ZenCacheStore->GetNamespaces(); + + for (auto& Namespace : Namespaces) + { + DirNode.AddVirtualNode(Namespace, m_VfsImpl->m_VfsDataSource); + } + } + else if (NodePath.starts_with("ddc_cache\\"sv)) + { + std::string_view NamespaceId{NodePath}; + NamespaceId = NamespaceId.substr(10); // Skip "ddc_cache\" + + auto& Cache = m_VfsImpl->m_ZenCacheStore; + + if (std::string_view::size_type SlashOffset = NamespaceId.find_first_of('\\'); SlashOffset == std::string_view::npos) + { + // Bucket enumeration + + if (auto NsInfo = Cache->GetNamespaceInfo(NamespaceId)) + { + for (auto& BucketName : NsInfo->BucketNames) + { + DirNode.AddVirtualNode(BucketName, m_VfsImpl->m_VfsDataSource); + } + } + } + else + { + // Bucket contents enumeration + + std::string_view BucketId = NamespaceId.substr(SlashOffset + 1); + NamespaceId = NamespaceId.substr(0, SlashOffset); + + Ref<VfsCacheDataSource> DataSource = GetCacheDataSource(NamespaceId, BucketId); + + auto Enumerator = [&](const IoHash& Key, const CacheValueDetails::ValueDetails& Details) { + ExtendableStringBuilder<64> KeyString; + Key.ToHexString(KeyString); + KeyString.Append(".udd"); + DirNode.AddFileNode(KeyString, Details.Size, Oid::Zero, DataSource); + }; + + Cache->EnumerateBucketContents(NamespaceId, BucketId, Enumerator); + } + } +} + +} // namespace zen +#endif |