diff options
| author | Stefan Boberg <[email protected]> | 2023-05-15 18:07:45 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-05-15 18:07:45 +0200 |
| commit | 06aaf561fed3db1eed2a044180895957de87ed20 (patch) | |
| tree | c80ef0d23b77ea251904c9f05a0626ef6fdc3a78 | |
| parent | implemented string conversion for CbValidateError enum (diff) | |
| parent | Remove ATL header usage (#306) (diff) | |
| download | zen-06aaf561fed3db1eed2a044180895957de87ed20.tar.xz zen-06aaf561fed3db1eed2a044180895957de87ed20.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
| -rw-r--r-- | src/zen/internalfile.cpp | 4 | ||||
| -rw-r--r-- | src/zen/internalfile.h | 18 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 60 | ||||
| -rw-r--r-- | src/zencore/include/zencore/except.h | 1 | ||||
| -rw-r--r-- | src/zencore/include/zencore/windows.h | 417 | ||||
| -rw-r--r-- | src/zencore/include/zencore/workthreadpool.h | 46 | ||||
| -rw-r--r-- | src/zencore/iobuffer.cpp | 6 | ||||
| -rw-r--r-- | src/zencore/windows.cpp | 64 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 221 | ||||
| -rw-r--r-- | src/zencore/xmake.lua | 6 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.cpp | 22 | ||||
| -rw-r--r-- | src/zenserver-test/projectclient.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver-test/zenserver-test.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamapply.cpp | 4 | ||||
| -rw-r--r-- | src/zenserver/upstream/upstreamcache.cpp | 7 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 1 |
18 files changed, 780 insertions, 111 deletions
diff --git a/src/zen/internalfile.cpp b/src/zen/internalfile.cpp index 2ade86e29..3b3d466a1 100644 --- a/src/zen/internalfile.cpp +++ b/src/zen/internalfile.cpp @@ -132,10 +132,6 @@ FileBufferManager::ReturnBuffer(zen::IoBuffer Buffer) ////////////////////////////////////////////////////////////////////////// InternalFile::InternalFile() -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC -: m_File(nullptr) -, m_Mmap(nullptr) -#endif { } diff --git a/src/zen/internalfile.h b/src/zen/internalfile.h index 8acb600ff..f7bd155fa 100644 --- a/src/zen/internalfile.h +++ b/src/zen/internalfile.h @@ -12,10 +12,6 @@ # include <zencore/windows.h> #endif -#if ZEN_PLATFORM_WINDOWS -# include <atlfile.h> -#endif - #include <filesystem> #include <list> @@ -52,11 +48,13 @@ public: size_t GetFileSize(); private: -#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC - using CAtlFile = void*; - using CAtlFileMappingBase = void*; +#if ZEN_PLATFORM_WINDOWS + zen::windows::FileHandle m_File; + zen::windows::FileMapping m_Mmap; +#else + void* m_File = nullptr; + void* m_Mmap = nullptr; #endif - CAtlFile m_File; - CAtlFileMappingBase m_Mmap; - void* m_Memory = nullptr; + + void* m_Memory = nullptr; }; diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index 8a6a9869c..9d55331eb 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -15,10 +15,10 @@ #endif #if ZEN_PLATFORM_WINDOWS -# include <atlbase.h> -# include <atlfile.h> +ZEN_THIRD_PARTY_INCLUDES_START # include <winioctl.h> # include <winnt.h> +ZEN_THIRD_PARTY_INCLUDES_END #endif #if ZEN_PLATFORM_LINUX @@ -51,13 +51,13 @@ using namespace std::literals; static bool DeleteReparsePoint(const wchar_t* Path, DWORD dwReparseTag) { - CHandle hDir(CreateFileW(Path, - GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - nullptr, - OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, - nullptr)); + windows::Handle hDir(CreateFileW(Path, + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + nullptr, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT, + nullptr)); if (hDir != INVALID_HANDLE_VALUE) { @@ -245,13 +245,13 @@ bool SupportsBlockRefCounting(std::filesystem::path Path) { #if ZEN_PLATFORM_WINDOWS - ATL::CHandle Handle(CreateFileW(Path.c_str(), - GENERIC_READ, - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - nullptr, - OPEN_EXISTING, - FILE_FLAG_BACKUP_SEMANTICS, - nullptr)); + windows::Handle Handle(CreateFileW(Path.c_str(), + GENERIC_READ, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + nullptr, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, + nullptr)); if (Handle == INVALID_HANDLE_VALUE) { @@ -281,7 +281,7 @@ bool CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) { #if ZEN_PLATFORM_WINDOWS - ATL::CHandle FromFile(CreateFileW(FromPath.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); + windows::Handle FromFile(CreateFileW(FromPath.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); if (FromFile == INVALID_HANDLE_VALUE) { FromFile.Detach(); @@ -327,13 +327,13 @@ CloneFile(std::filesystem::path FromPath, std::filesystem::path ToPath) SetFileAttributesW(ToPath.c_str(), FILE_ATTRIBUTE_NORMAL); - ATL::CHandle TargetFile(CreateFileW(ToPath.c_str(), - GENERIC_READ | GENERIC_WRITE | DELETE, - /* no sharing */ FILE_SHARE_READ, - nullptr, - OPEN_ALWAYS, - 0, - /* hTemplateFile */ FromFile)); + windows::Handle TargetFile(CreateFileW(ToPath.c_str(), + GENERIC_READ | GENERIC_WRITE | DELETE, + /* no sharing */ FILE_SHARE_READ, + nullptr, + OPEN_ALWAYS, + 0, + /* hTemplateFile */ FromFile)); if (TargetFile == INVALID_HANDLE_VALUE) { @@ -560,8 +560,8 @@ void WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t BufferCount) { #if ZEN_PLATFORM_WINDOWS - CAtlFile Outfile; - HRESULT hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); + windows::FileHandle Outfile; + HRESULT hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { CreateDirectories(Path.parent_path()); @@ -676,7 +676,7 @@ ReadFile(std::filesystem::path Path) void* Handle; #if ZEN_PLATFORM_WINDOWS - ATL::CHandle FromFile(CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); + windows::Handle FromFile(CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); if (FromFile == INVALID_HANDLE_VALUE) { FromFile.Detach(); @@ -717,7 +717,7 @@ bool ScanFile(std::filesystem::path Path, const uint64_t ChunkSize, std::function<void(const void* Data, size_t Size)>&& ProcessFunc) { #if ZEN_PLATFORM_WINDOWS - ATL::CHandle FromFile(CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); + windows::Handle FromFile(CreateFileW(Path.c_str(), GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr)); if (FromFile == INVALID_HANDLE_VALUE) { FromFile.Detach(); @@ -826,8 +826,8 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr FILE_INFO_BY_HANDLE_CLASS FibClass = FileIdBothDirectoryRestartInfo; bool Continue = true; - CAtlFile RootDirHandle; - HRESULT hRes = + windows::FileHandle RootDirHandle; + HRESULT hRes = RootDirHandle.Create(RootDir.c_str(), GENERIC_READ, FILE_SHARE_READ | FILE_SHARE_WRITE, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS); if (FAILED(hRes)) diff --git a/src/zencore/include/zencore/except.h b/src/zencore/include/zencore/except.h index d3269f33a..114f98d77 100644 --- a/src/zencore/include/zencore/except.h +++ b/src/zencore/include/zencore/except.h @@ -3,6 +3,7 @@ #pragma once #include <zencore/string.h> +#include <zencore/zencore.h> #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #else diff --git a/src/zencore/include/zencore/windows.h b/src/zencore/include/zencore/windows.h index 91828f0ec..333188cb3 100644 --- a/src/zencore/include/zencore/windows.h +++ b/src/zencore/include/zencore/windows.h @@ -4,22 +4,411 @@ #include <zencore/zencore.h> +#if ZEN_PLATFORM_WINDOWS + ZEN_THIRD_PARTY_INCLUDES_START struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- -#ifndef NOMINMAX -# define NOMINMAX // We don't want your min/max macros -#endif -#ifndef NOGDI -# define NOGDI // We don't want your GetObject define -#endif -#ifndef WIN32_LEAN_AND_MEAN -# define WIN32_LEAN_AND_MEAN -#endif -#ifndef _WIN32_WINNT -# define _WIN32_WINNT 0x0A00 -#endif -#include <windows.h> -#undef GetObject +# ifndef NOMINMAX +# define NOMINMAX // We don't want your min/max macros +# endif +# ifndef NOGDI +# define NOGDI // We don't want your GetObject define +# endif +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# ifndef _WIN32_WINNT +# define _WIN32_WINNT 0x0A00 +# endif +# include <windows.h> +# undef GetObject ZEN_THIRD_PARTY_INCLUDES_END + +////////////////////////////////////////////////////////////////////////// + +namespace zen::windows { + +class Handle +{ +public: + Handle() noexcept = default; + inline Handle(Handle& h) noexcept { Attach(h.Detach()); } + explicit Handle(HANDLE h) noexcept : m_Handle(h) {} + inline ~Handle() noexcept + { + if (m_Handle) + { + Close(); + } + } + + Handle& operator=(Handle& InHandle) noexcept + { + if (this != &InHandle) + { + if (m_Handle != NULL) + { + Close(); + } + Attach(InHandle.Detach()); + } + + return *this; + } + + inline operator HANDLE() const noexcept { return m_Handle; } + inline void Attach(HANDLE h) noexcept + { + ZEN_ASSERT(m_Handle == NULL); + m_Handle = h; + } + inline HANDLE Detach() noexcept + { + HANDLE h; + + h = m_Handle; + m_Handle = NULL; + + return h; + } + void Close() noexcept + { + if (m_Handle != NULL) + { + ::CloseHandle(m_Handle); + m_Handle = NULL; + } + } + +public: + HANDLE m_Handle{0}; +}; + +///////////////////////////////////////////////////////////////////////////// +// Error to HRESULT helpers + +inline HRESULT +MapHresultFromLastError() noexcept +{ + DWORD ErrorCode = ::GetLastError(); + return HRESULT_FROM_WIN32(ErrorCode); +} + +inline HRESULT +MapHresultFromWin32(DWORD ErrorCode) noexcept +{ + return HRESULT_FROM_WIN32(ErrorCode); +} + +////////////////////////////////////////////////////////////////////////// + +class FileHandle : public Handle +{ +public: + FileHandle() noexcept = default; + FileHandle(FileHandle& InFile) noexcept : Handle(InFile) {} + explicit FileHandle(HANDLE InFileHandle) noexcept : Handle(InFileHandle) {} + + HRESULT Create(LPCTSTR szFilename, + DWORD dwDesiredAccess, + DWORD dwShareMode, + DWORD dwCreationDisposition, + DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL) + { + ZEN_ASSERT(m_Handle == NULL); + + HANDLE hFile = ::CreateFile(szFilename, dwDesiredAccess, dwShareMode, NULL, dwCreationDisposition, dwFlagsAndAttributes, NULL); + + if (hFile == INVALID_HANDLE_VALUE) + return MapHresultFromLastError(); + + Attach(hFile); + + return S_OK; + } + + HRESULT Read(LPVOID pBuffer, DWORD nBufSize) + { + ZEN_ASSERT(m_Handle != NULL); + + DWORD nBytesRead = 0; + BOOL Success = ::ReadFile(m_Handle, pBuffer, nBufSize, &nBytesRead, NULL); + + if (!Success) + return MapHresultFromLastError(); + + if (nBytesRead != nBufSize) + return HRESULT_FROM_WIN32(ERROR_HANDLE_EOF); + + return S_OK; + } + + HRESULT Read(LPVOID pBuffer, DWORD nBufSize, LPOVERLAPPED pOverlapped) + { + ZEN_ASSERT(m_Handle != NULL); + + BOOL Success = ::ReadFile(m_Handle, pBuffer, nBufSize, NULL, pOverlapped); + if (!Success) + return MapHresultFromLastError(); + + return S_OK; + } + + HRESULT Write(LPCVOID pBuffer, DWORD nBufSize, DWORD* pnBytesWritten = NULL) + { + ZEN_ASSERT(m_Handle != NULL); + + DWORD nBytesWritten; + if (pnBytesWritten == NULL) + pnBytesWritten = &nBytesWritten; + + BOOL Success = ::WriteFile(m_Handle, pBuffer, nBufSize, pnBytesWritten, NULL); + if (!Success) + return MapHresultFromLastError(); + + return S_OK; + } + + HRESULT Write(LPCVOID pBuffer, DWORD nBufSize, LPOVERLAPPED pOverlapped) + { + ZEN_ASSERT(m_Handle != NULL); + + BOOL Success = ::WriteFile(m_Handle, pBuffer, nBufSize, NULL, pOverlapped); + if (!Success) + return MapHresultFromLastError(); + + return S_OK; + } + + HRESULT Seek(LONGLONG nOffset, DWORD dwFrom = FILE_CURRENT) + { + ZEN_ASSERT(m_Handle != NULL); + ZEN_ASSERT(dwFrom == FILE_BEGIN || dwFrom == FILE_END || dwFrom == FILE_CURRENT); + + LARGE_INTEGER liOffset; + liOffset.QuadPart = nOffset; + DWORD nNewPos = ::SetFilePointer(m_Handle, liOffset.LowPart, &liOffset.HighPart, dwFrom); + + if (nNewPos == INVALID_SET_FILE_POINTER) + { + HRESULT hr = MapHresultFromLastError(); + + if (FAILED(hr)) + return hr; + } + + return S_OK; + } + + HRESULT GetPosition(ULONGLONG& OutPos) const + { + ZEN_ASSERT(m_Handle != NULL); + + LARGE_INTEGER liOffset; + liOffset.QuadPart = 0; + liOffset.LowPart = ::SetFilePointer(m_Handle, 0, &liOffset.HighPart, FILE_CURRENT); + if (liOffset.LowPart == INVALID_SET_FILE_POINTER) + { + HRESULT hr = MapHresultFromLastError(); + + if (FAILED(hr)) + return hr; + } + OutPos = liOffset.QuadPart; + + return S_OK; + } + + HRESULT GetSize(ULONGLONG& OutLen) const + { + ZEN_ASSERT(m_Handle != NULL); + + ULARGE_INTEGER liFileSize; + liFileSize.LowPart = ::GetFileSize(m_Handle, &liFileSize.HighPart); + + if (liFileSize.LowPart == INVALID_FILE_SIZE) + { + HRESULT hr = MapHresultFromLastError(); + if (FAILED(hr)) + return hr; + } + + OutLen = liFileSize.QuadPart; + + return S_OK; + } +}; + +////////////////////////////////////////////////////////////////////////// + +class FileMapping +{ +public: + FileMapping() throw() + { + m_pData = NULL; + m_hMapping = NULL; + } + + ~FileMapping() throw() { Unmap(); } + + HRESULT MapFile(_In_ HANDLE hFile, + _In_ SIZE_T nMappingSize = 0, + _In_ ULONGLONG nOffset = 0, + _In_ DWORD dwMappingProtection = PAGE_READONLY, + _In_ DWORD dwViewDesiredAccess = FILE_MAP_READ) throw() + { + ZEN_ASSERT(m_pData == NULL); + ZEN_ASSERT(m_hMapping == NULL); + ZEN_ASSERT(hFile != INVALID_HANDLE_VALUE && hFile != NULL); + + ULARGE_INTEGER liFileSize; + liFileSize.LowPart = ::GetFileSize(hFile, &liFileSize.HighPart); + if (liFileSize.QuadPart < nMappingSize) + liFileSize.QuadPart = nMappingSize; + + m_hMapping = ::CreateFileMapping(hFile, NULL, dwMappingProtection, liFileSize.HighPart, liFileSize.LowPart, 0); + if (m_hMapping == NULL) + return MapHresultFromLastError(); + + if (nMappingSize == 0) + m_nMappingSize = (SIZE_T)(liFileSize.QuadPart - nOffset); + else + m_nMappingSize = nMappingSize; + + m_dwViewDesiredAccess = dwViewDesiredAccess; + m_nOffset.QuadPart = nOffset; + + m_pData = ::MapViewOfFileEx(m_hMapping, m_dwViewDesiredAccess, m_nOffset.HighPart, m_nOffset.LowPart, m_nMappingSize, NULL); + if (m_pData == NULL) + { + HRESULT hr; + + hr = MapHresultFromLastError(); + ::CloseHandle(m_hMapping); + m_hMapping = NULL; + return hr; + } + + return S_OK; + } + + HRESULT MapSharedMem(_In_ SIZE_T nMappingSize, + _In_z_ LPCTSTR szName, + _Out_opt_ BOOL* pbAlreadyExisted = NULL, + _In_opt_ LPSECURITY_ATTRIBUTES lpsa = NULL, + _In_ DWORD dwMappingProtection = PAGE_READWRITE, + _In_ DWORD dwViewDesiredAccess = FILE_MAP_ALL_ACCESS) throw() + { + ZEN_ASSERT(m_pData == NULL); + ZEN_ASSERT(m_hMapping == NULL); + ZEN_ASSERT(nMappingSize > 0); + ZEN_ASSERT(szName != NULL); // if you just want a regular chunk of memory, use a heap allocator + + m_nMappingSize = nMappingSize; + + ULARGE_INTEGER nSize; + nSize.QuadPart = nMappingSize; + m_hMapping = ::CreateFileMapping(INVALID_HANDLE_VALUE, lpsa, dwMappingProtection, nSize.HighPart, nSize.LowPart, szName); + if (m_hMapping == NULL) + { + HRESULT hr = MapHresultFromLastError(); + _Analysis_assume_(FAILED(hr)); + return hr; + } + + if (pbAlreadyExisted != NULL) + *pbAlreadyExisted = (GetLastError() == ERROR_ALREADY_EXISTS); + + m_dwViewDesiredAccess = dwViewDesiredAccess; + m_nOffset.QuadPart = 0; + + m_pData = ::MapViewOfFileEx(m_hMapping, m_dwViewDesiredAccess, m_nOffset.HighPart, m_nOffset.LowPart, m_nMappingSize, NULL); + if (m_pData == NULL) + { + HRESULT hr; + + hr = MapHresultFromLastError(); + ::CloseHandle(m_hMapping); + m_hMapping = NULL; + return hr; + } + + return S_OK; + } + + HRESULT OpenMapping(_In_z_ LPCTSTR szName, + _In_ SIZE_T nMappingSize, + _In_ ULONGLONG nOffset = 0, + _In_ DWORD dwViewDesiredAccess = FILE_MAP_ALL_ACCESS) throw() + { + ZEN_ASSERT(m_pData == NULL); + ZEN_ASSERT(m_hMapping == NULL); + ZEN_ASSERT(szName != NULL); // if you just want a regular chunk of memory, use a heap allocator + + m_nMappingSize = nMappingSize; + m_dwViewDesiredAccess = dwViewDesiredAccess; + + m_hMapping = ::OpenFileMapping(m_dwViewDesiredAccess, FALSE, szName); + if (m_hMapping == NULL) + return MapHresultFromLastError(); + + m_dwViewDesiredAccess = dwViewDesiredAccess; + m_nOffset.QuadPart = nOffset; + + m_pData = ::MapViewOfFileEx(m_hMapping, m_dwViewDesiredAccess, m_nOffset.HighPart, m_nOffset.LowPart, m_nMappingSize, NULL); + if (m_pData == NULL) + { + HRESULT hr; + + hr = MapHresultFromLastError(); + ::CloseHandle(m_hMapping); + m_hMapping = NULL; + return hr; + } + + return S_OK; + } + + HRESULT Unmap() throw() + { + HRESULT hr = S_OK; + + if (m_pData != NULL) + { + if (!::UnmapViewOfFile(m_pData)) + hr = MapHresultFromLastError(); + m_pData = NULL; + } + if (m_hMapping != NULL) + { + if (!::CloseHandle(m_hMapping) && SUCCEEDED(hr)) + hr = MapHresultFromLastError(); + m_hMapping = NULL; + } + return hr; + } + + void* GetData() const throw() { return m_pData; } + + HANDLE GetHandle() const throw() { return m_hMapping; } + + SIZE_T GetMappingSize() throw() { return m_nMappingSize; } + + HRESULT CopyFrom(_In_ FileMapping& orig) throw(); + + FileMapping(_In_ FileMapping& orig); + FileMapping& operator=(_In_ FileMapping& orig); + +private: + void* m_pData; + SIZE_T m_nMappingSize; + HANDLE m_hMapping; + ULARGE_INTEGER m_nOffset; + DWORD m_dwViewDesiredAccess; +}; + +} // namespace zen::windows +#endif diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h index 0ddc65298..c10c6ed12 100644 --- a/src/zencore/include/zencore/workthreadpool.h +++ b/src/zencore/include/zencore/workthreadpool.h @@ -4,17 +4,16 @@ #include <zencore/zencore.h> -#include <zencore/blockingqueue.h> #include <zencore/refcount.h> #include <exception> #include <functional> -#include <system_error> -#include <thread> -#include <vector> +#include <future> namespace zen { +////////////////////////////////////////////////////////////////////////// + struct IWork : public RefCounted { virtual void Execute() = 0; @@ -27,22 +26,51 @@ private: friend class WorkerThreadPool; }; +////////////////////////////////////////////////////////////////////////// + class WorkerThreadPool { public: - WorkerThreadPool(int InThreadCount); + explicit WorkerThreadPool(int InThreadCount); + WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName); ~WorkerThreadPool(); void ScheduleWork(Ref<IWork> Work); void ScheduleWork(std::function<void()>&& Work); - [[nodiscard]] size_t PendingWork() const; + template<typename Func> + auto EnqueueTask(std::packaged_task<Func> Task); + + [[nodiscard]] size_t PendingWorkItemCount() const; private: - void WorkerThreadFunction(); + struct Impl; + struct ThreadStartInfo; - std::vector<std::thread> m_WorkerThreads; - BlockingQueue<Ref<IWork>> m_WorkQueue; + std::unique_ptr<Impl> m_Impl; }; +////////////////////////////////////////////////////////////////////////// + +template<typename Func> +auto +WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task) +{ + struct FutureWork : IWork + { + FutureWork(std::packaged_task<Func> Task) : m_Task{std::move(Task)} {} + virtual void Execute() override { m_Task(); } + + std::packaged_task<Func> m_Task; + }; + + Ref<FutureWork> Work{new FutureWork(std::move(Task))}; + + auto Future = Work->m_Task.get_future(); + ScheduleWork(std::move(Work)); + return Future; +} + +void workthreadpool_forcelink(); // internal + } // namespace zen diff --git a/src/zencore/iobuffer.cpp b/src/zencore/iobuffer.cpp index 96653c670..3aca0cfa7 100644 --- a/src/zencore/iobuffer.cpp +++ b/src/zencore/iobuffer.cpp @@ -21,7 +21,7 @@ ZEN_THIRD_PARTY_INCLUDES_END #endif #if ZEN_PLATFORM_WINDOWS -# include <atlfile.h> +# include <zencore/windows.h> #else # include <sys/stat.h> # include <sys/mman.h> @@ -555,7 +555,7 @@ IoBufferBuilder::MakeFromFile(const std::filesystem::path& FileName, uint64_t Of uint64_t FileSize; #if ZEN_PLATFORM_WINDOWS - CAtlFile DataFile; + windows::FileHandle DataFile; DWORD ShareOptions = FILE_SHARE_DELETE | FILE_SHARE_WRITE | FILE_SHARE_DELETE | FILE_SHARE_READ; HRESULT hRes = DataFile.Create(FileName.c_str(), GENERIC_READ, ShareOptions, OPEN_EXISTING); @@ -620,7 +620,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const std::filesystem::path& FileName) void* Handle; #if ZEN_PLATFORM_WINDOWS - CAtlFile DataFile; + windows::FileHandle DataFile; // We need to open with DELETE since this is used for the case // when a file has been written to a staging directory, and is going diff --git a/src/zencore/windows.cpp b/src/zencore/windows.cpp new file mode 100644 index 000000000..76d8ab445 --- /dev/null +++ b/src/zencore/windows.cpp @@ -0,0 +1,64 @@ + +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencore/zencore.h" + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/except.h> +# include "zencore/windows.h" + +namespace zen::windows { + +FileMapping::FileMapping(_In_ FileMapping& orig) +{ + m_pData = NULL; + m_hMapping = NULL; + + HRESULT hr = CopyFrom(orig); + if (FAILED(hr)) + zen::ThrowSystemException(hr, "Failed to clone FileMapping"); +} + +FileMapping& +FileMapping::operator=(_In_ FileMapping& orig) +{ + HRESULT hr = CopyFrom(orig); + if (FAILED(hr)) + zen::ThrowSystemException(hr, "Failed to clone FileMapping"); + + return *this; +} + +HRESULT +FileMapping::CopyFrom(_In_ FileMapping& orig) throw() +{ + if (this == &orig) + return S_OK; + + ZEN_ASSERT(m_pData == NULL); + ZEN_ASSERT(m_hMapping == NULL); + ZEN_ASSERT(orig.m_pData != NULL); + + m_dwViewDesiredAccess = orig.m_dwViewDesiredAccess; + m_nOffset.QuadPart = orig.m_nOffset.QuadPart; + m_nMappingSize = orig.m_nMappingSize; + + if (!::DuplicateHandle(GetCurrentProcess(), orig.m_hMapping, GetCurrentProcess(), &m_hMapping, NULL, TRUE, DUPLICATE_SAME_ACCESS)) + return MapHresultFromLastError(); + + m_pData = ::MapViewOfFileEx(m_hMapping, m_dwViewDesiredAccess, m_nOffset.HighPart, m_nOffset.LowPart, m_nMappingSize, NULL); + if (m_pData == NULL) + { + HRESULT hr; + + hr = MapHresultFromLastError(); + ::CloseHandle(m_hMapping); + m_hMapping = NULL; + return hr; + } + + return S_OK; +} + +} // namespace zen::windows +#endif diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index b4328cdbd..cc21e717a 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -2,7 +2,21 @@ #include <zencore/workthreadpool.h> +#include <zencore/blockingqueue.h> #include <zencore/logging.h> +#include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/thread.h> +#include <zencore/trace.h> + +#include <thread> +#include <vector> + +#define ZEN_USE_WINDOWS_THREADPOOL 1 + +#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL +# include <zencore/windows.h> +#endif namespace zen { @@ -16,47 +30,143 @@ namespace detail { }; } // namespace detail -WorkerThreadPool::WorkerThreadPool(int InThreadCount) +////////////////////////////////////////////////////////////////////////// + +#if ZEN_USE_WINDOWS_THREADPOOL && ZEN_PLATFORM_WINDOWS + +namespace { + thread_local bool t_IsThreadNamed{false}; +} + +struct WorkerThreadPool::Impl { - for (int i = 0; i < InThreadCount; ++i) + PTP_POOL m_ThreadPool = nullptr; + PTP_CLEANUP_GROUP m_CleanupGroup = nullptr; + TP_CALLBACK_ENVIRON m_CallbackEnvironment; + PTP_WORK m_Work = nullptr; + + std::string m_WorkerThreadBaseName; + std::atomic<int> m_WorkerThreadCounter{0}; + + RwLock m_QueueLock; + std::deque<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) { - m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + // Thread pool setup + + m_ThreadPool = CreateThreadpool(NULL); + + SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount); + SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2); + + InitializeThreadpoolEnvironment(&m_CallbackEnvironment); + + m_CleanupGroup = CreateThreadpoolCleanupGroup(); + + SetThreadpoolCallbackPool(&m_CallbackEnvironment, m_ThreadPool); + SetThreadpoolCallbackCleanupGroup(&m_CallbackEnvironment, m_CleanupGroup, NULL); + + m_Work = CreateThreadpoolWork(&WorkCallback, this, &m_CallbackEnvironment); } -} -WorkerThreadPool::~WorkerThreadPool() -{ - m_WorkQueue.CompleteAdding(); + ~Impl() + { + WaitForThreadpoolWorkCallbacks(m_Work, /* CancelPendingCallbacks */ TRUE); + CloseThreadpoolWork(m_Work); + } - for (std::thread& Thread : m_WorkerThreads) + void ScheduleWork(Ref<IWork> Work) { - Thread.join(); + m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); }); + SubmitThreadpoolWork(m_Work); } + [[nodiscard]] size_t PendingWorkItemCount() const { return 0; } - m_WorkerThreads.clear(); -} + static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work) + { + ZEN_UNUSED(Instance, Work); + Impl* ThisPtr = reinterpret_cast<Impl*>(Context); + ThisPtr->DoWork(); + } -void -WorkerThreadPool::ScheduleWork(Ref<IWork> Work) -{ - m_WorkQueue.Enqueue(std::move(Work)); -} + void DoWork() + { + if (!t_IsThreadNamed) + { + t_IsThreadNamed = true; + const int ThreadIndex = ++m_WorkerThreadCounter; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } -void -WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) + Ref<IWork> WorkFromQueue; + + { + RwLock::ExclusiveLockScope _{m_QueueLock}; + WorkFromQueue = std::move(m_WorkQueue.front()); + m_WorkQueue.pop_front(); + } + + WorkFromQueue->Execute(); + } +}; + +#else + +struct WorkerThreadPool::ThreadStartInfo { - m_WorkQueue.Enqueue(Ref<IWork>(new detail::LambdaWork(Work))); -} + int ThreadNumber; + zen::Latch* Latch; +}; -[[nodiscard]] size_t -WorkerThreadPool::PendingWork() const +struct WorkerThreadPool::Impl { - return m_WorkQueue.Size(); -} + void WorkerThreadFunction(ThreadStartInfo Info); + std::string m_WorkerThreadBaseName; + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; + + Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName) + { + trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str()); + + zen::Latch WorkerLatch{InThreadCount}; + + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch}); + } + + WorkerLatch.Wait(); + + trace::ThreadGroupEnd(); + } + + ~Impl() + { + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); + } + + void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); } + [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); } +}; void -WorkerThreadPool::WorkerThreadFunction() +WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) { + SetCurrentThreadName(fmt::format("{}_{}", m_WorkerThreadBaseName, Info.ThreadNumber)); + + Info.Latch->CountDown(); + do { Ref<IWork> Work; @@ -80,4 +190,65 @@ WorkerThreadPool::WorkerThreadFunction() } while (true); } +#endif + +////////////////////////////////////////////////////////////////////////// +WorkerThreadPool::WorkerThreadPool(int InThreadCount) : WorkerThreadPool(InThreadCount, "workerthread") +{ +} + +WorkerThreadPool::WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName) +{ + m_Impl = std::make_unique<Impl>(InThreadCount, WorkerThreadBaseName); +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_Impl.reset(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_Impl->ScheduleWork(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + ScheduleWork(Ref<IWork>(new detail::LambdaWork(Work))); +} + +[[nodiscard]] size_t +WorkerThreadPool::PendingWorkItemCount() const +{ + return m_Impl->PendingWorkItemCount(); +} + +////////////////////////////////////////////////////////////////////////// + +#if ZEN_WITH_TESTS + +void +workthreadpool_forcelink() +{ +} + +using namespace std::literals; + +TEST_CASE("threadpool.basic") +{ + WorkerThreadPool Threadpool{1}; + + auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}); + auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}); + auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }}); + + CHECK_EQ(Future42.get(), 42); + CHECK_EQ(Future99.get(), 99); + CHECK_THROWS(FutureThrow.get()); +} + +#endif + } // namespace zen diff --git a/src/zencore/xmake.lua b/src/zencore/xmake.lua index 4ec334574..1be89c14a 100644 --- a/src/zencore/xmake.lua +++ b/src/zencore/xmake.lua @@ -61,3 +61,9 @@ target('zencore') if is_plat("linux") then add_syslinks("rt") end + + if is_plat("windows") then + add_syslinks("Advapi32") + add_syslinks("Shell32") + add_syslinks("User32") + end
\ No newline at end of file diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp index 25e4393b3..979f69aeb 100644 --- a/src/zenhttp/httpsys.cpp +++ b/src/zenhttp/httpsys.cpp @@ -178,7 +178,7 @@ class HttpSysServerRequest : public HttpServerRequest { public: HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& Service, IoBuffer PayloadBuffer); - ~HttpSysServerRequest() = default; + ~HttpSysServerRequest(); virtual Oid ParseSessionId() const override; virtual uint32_t ParseRequestId() const override; @@ -722,7 +722,7 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThr : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) -, m_AsyncWorkPool(AsyncWorkThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount, "http_async") { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -1093,6 +1093,9 @@ HttpSysTransaction::IssueInitialRequest(std::error_code& ErrorCode) m_InitialHttpHandler.IssueRequest(ErrorCode); } +thread_local bool t_IsHttpSysThreadNamed = false; +static std::atomic<int> HttpSysThreadIndex = 0; + void HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, PVOID pContext /* HttpSysServer */, @@ -1105,6 +1108,17 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(pContext); + // Assign names to threads for context + + if (!t_IsHttpSysThreadNamed) + { + t_IsHttpSysThreadNamed = true; + const int ThreadIndex = ++HttpSysThreadIndex; + zen::ExtendableStringBuilder<128> ThreadName; + ThreadName << "httpio_" << ThreadIndex; + SetCurrentThreadName(ThreadName); + } + // Note that for a given transaction we may be in this completion function on more // than one thread at any given moment. This means we need to be careful about what // happens in here @@ -1306,6 +1320,10 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& } } +HttpSysServerRequest::~HttpSysServerRequest() +{ +} + Oid HttpSysServerRequest::ParseSessionId() const { diff --git a/src/zenserver-test/projectclient.cpp b/src/zenserver-test/projectclient.cpp index 597838e0d..cb493be77 100644 --- a/src/zenserver-test/projectclient.cpp +++ b/src/zenserver-test/projectclient.cpp @@ -13,10 +13,6 @@ # include <asio.hpp> # include <gsl/gsl-lite.hpp> -# if ZEN_PLATFORM_WINDOWS -# include <atlbase.h> -# endif - namespace zen { struct ProjectClientConnection diff --git a/src/zenserver-test/zenserver-test.cpp b/src/zenserver-test/zenserver-test.cpp index 3195181d1..f76f3e0ba 100644 --- a/src/zenserver-test/zenserver-test.cpp +++ b/src/zenserver-test/zenserver-test.cpp @@ -56,7 +56,6 @@ ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # include <ppl.h> -# include <atlbase.h> # include <process.h> #endif diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 592de0f7f..5b635b89d 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -29,6 +29,7 @@ ZEN_THIRD_PARTY_INCLUDES_END // Used for getting My Documents for default data directory # include <ShlObj.h> # pragma comment(lib, "shell32.lib") +# pragma comment(lib, "ole32.lib") std::filesystem::path PickDefaultStateDirectory() diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp index c719b225d..3d29f2228 100644 --- a/src/zenserver/upstream/upstreamapply.cpp +++ b/src/zenserver/upstream/upstreamapply.cpp @@ -188,9 +188,9 @@ public: virtual void GetStatus(CbObjectWriter& Status) override { Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount; - Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWork(); + Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount(); Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount; - Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWork(); + Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount(); Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp index 245097796..2a9e67c06 100644 --- a/src/zenserver/upstream/upstreamcache.cpp +++ b/src/zenserver/upstream/upstreamcache.cpp @@ -1490,7 +1490,7 @@ public: { for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this); + m_UpstreamThreads.emplace_back(&UpstreamCacheImpl::ProcessUpstreamQueue, this, Idx + 1); } m_EndpointMonitorThread = std::thread(&UpstreamCacheImpl::MonitorEndpoints, this); @@ -1947,8 +1947,11 @@ private: } } - void ProcessUpstreamQueue() + void ProcessUpstreamQueue(int ThreadIndex) { + std::string ThreadName = fmt::format("upstream_{}", ThreadIndex); + SetCurrentThreadName(ThreadName); + for (;;) { UpstreamCacheRecord CacheRecord; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 108d37607..4474b52bd 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -33,7 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_START #include <xxhash.h> #if ZEN_PLATFORM_WINDOWS -# include <atlfile.h> +# include <zencore/windows.h> #endif ZEN_THIRD_PARTY_INCLUDES_END @@ -301,7 +301,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: const HANDLE ChunkFileHandle = FileRef.FileHandle; // See if file already exists { - CAtlFile PayloadFile; + windows::FileHandle PayloadFile; if (HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); SUCCEEDED(hRes)) { @@ -386,7 +386,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash, CasStore:: // The rename/move could fail because the target directory does not yet exist. This code attempts // to create it - CAtlFile DirHandle; + windows::FileHandle DirHandle; auto InternalCreateDirectoryHandle = [&] { return DirHandle.Create(FilePath.c_str(), @@ -551,7 +551,7 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize // See if file already exists #if ZEN_PLATFORM_WINDOWS - CAtlFile PayloadFile; + windows::FileHandle PayloadFile; HRESULT hRes = PayloadFile.Create(Name.ShardedPath.c_str(), GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 5ecde343b..ead919cb9 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -13,7 +13,6 @@ #include <atomic> #if ZEN_PLATFORM_WINDOWS -# include <atlbase.h> # include <zencore/windows.h> #else # include <sys/mman.h> |