diff options
| author | Stefan Boberg <[email protected]> | 2023-05-11 13:37:33 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2023-05-11 13:37:33 +0200 |
| commit | 964d1bae6ece1b026e443967139ed18fa08fd9fe (patch) | |
| tree | 48041b36083fbb712b649f12b6197ecd17dec62e /src | |
| parent | added scrubcontext.cpp (diff) | |
| parent | v0.2.10 (diff) | |
| download | zen-964d1bae6ece1b026e443967139ed18fa08fd9fe.tar.xz zen-964d1bae6ece1b026e443967139ed18fa08fd9fe.zip | |
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src')
| -rw-r--r-- | src/zenhttp/httpasio.cpp | 58 | ||||
| -rw-r--r-- | src/zenhttp/httpasio.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpnull.cpp | 5 | ||||
| -rw-r--r-- | src/zenhttp/httpnull.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.cpp | 10 | ||||
| -rw-r--r-- | src/zenhttp/httpsys.h | 1 | ||||
| -rw-r--r-- | src/zenhttp/include/zenhttp/httpserver.h | 1 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 1 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 53 | ||||
| -rw-r--r-- | src/zenstore/filecas.h | 4 |
10 files changed, 101 insertions, 34 deletions
diff --git a/src/zenhttp/httpasio.cpp b/src/zenhttp/httpasio.cpp index 79b2c0a3d..7149caf28 100644 --- a/src/zenhttp/httpasio.cpp +++ b/src/zenhttp/httpasio.cpp @@ -329,7 +329,7 @@ private: void EnqueueRead(); void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount); void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false); - void OnError(); + void CloseConnection(); HttpAsioServerImpl& m_Server; asio::streambuf m_RequestBuffer; @@ -368,9 +368,17 @@ HttpServerConnection::HandleNewRequest() void HttpServerConnection::TerminateConnection() { + if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) + { + return; + } + m_RequestState = RequestState::kTerminated; + ZEN_ASSERT(m_Socket); + // Terminating, we don't care about any errors when closing socket std::error_code Ec; + m_Socket->shutdown(asio::socket_base::shutdown_both, Ec); m_Socket->close(Ec); } @@ -407,7 +415,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] else { ZEN_WARN("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); - return OnError(); + return TerminateConnection(); } } @@ -424,7 +432,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused] size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size()); if (Result == ~0ull) { - return OnError(); + return TerminateConnection(); } m_RequestBuffer.consume(Result); @@ -449,7 +457,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu if (Ec) { ZEN_WARN("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message()); - OnError(); + TerminateConnection(); } else { @@ -461,9 +469,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu if (!m_RequestData.IsKeepAlive()) { - m_RequestState = RequestState::kDone; - - m_Socket->close(); + CloseConnection(); } else { @@ -479,9 +485,26 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu } void -HttpServerConnection::OnError() +HttpServerConnection::CloseConnection() { - m_Socket->close(); + if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated) + { + return; + } + ZEN_ASSERT(m_Socket); + m_RequestState = RequestState::kDone; + + std::error_code Ec; + m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec); + if (Ec) + { + ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message()); + } + m_Socket->close(Ec); + if (Ec) + { + ZEN_WARN("socket close ERROR, reason '{}'", Ec.message()); + } } void @@ -1053,7 +1076,12 @@ struct HttpAcceptor } else { - m_Acceptor.close(); + std::error_code CloseEc; + m_Acceptor.close(CloseEc); + if (CloseEc) + { + ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message()); + } } }); } @@ -1296,6 +1324,15 @@ HttpAsioServer::HttpAsioServer() : m_Impl(std::make_unique<asio_http::HttpAsioSe HttpAsioServer::~HttpAsioServer() { + if (m_Impl) + { + ZEN_ERROR("~HttpAsioServer() called without calling Close() first"); + } +} + +void +HttpAsioServer::Close() +{ try { m_Impl->Stop(); @@ -1304,6 +1341,7 @@ HttpAsioServer::~HttpAsioServer() { ZEN_WARN("Caught exception stopping http asio server: {}", ex.what()); } + m_Impl.reset(); } void diff --git a/src/zenhttp/httpasio.h b/src/zenhttp/httpasio.h index 716145955..de25c538f 100644 --- a/src/zenhttp/httpasio.h +++ b/src/zenhttp/httpasio.h @@ -25,6 +25,7 @@ public: virtual int Initialize(int BasePort) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; + virtual void Close() override; private: Event m_ShutdownEvent; diff --git a/src/zenhttp/httpnull.cpp b/src/zenhttp/httpnull.cpp index a6e1d3567..658f51831 100644 --- a/src/zenhttp/httpnull.cpp +++ b/src/zenhttp/httpnull.cpp @@ -80,4 +80,9 @@ HttpNullServer::RequestExit() m_ShutdownEvent.Set(); } +void +HttpNullServer::Close() +{ +} + } // namespace zen diff --git a/src/zenhttp/httpnull.h b/src/zenhttp/httpnull.h index 74f021f6b..965e729f7 100644 --- a/src/zenhttp/httpnull.h +++ b/src/zenhttp/httpnull.h @@ -21,6 +21,7 @@ public: virtual int Initialize(int BasePort) override; virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; + virtual void Close() override; private: Event m_ShutdownEvent; diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp index c733d618d..25e4393b3 100644 --- a/src/zenhttp/httpsys.cpp +++ b/src/zenhttp/httpsys.cpp @@ -741,9 +741,19 @@ HttpSysServer::~HttpSysServer() { if (m_IsHttpInitialized) { + ZEN_ERROR("~HttpSysServer() called without calling Close() first"); + } +} + +void +HttpSysServer::Close() +{ + if (m_IsHttpInitialized) + { Cleanup(); HttpTerminate(HTTP_INITIALIZE_SERVER, nullptr); + m_IsHttpInitialized = false; } } diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h index d6bd34890..cf16042d7 100644 --- a/src/zenhttp/httpsys.h +++ b/src/zenhttp/httpsys.h @@ -45,6 +45,7 @@ public: virtual void Run(bool TestMode) override; virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; + virtual void Close() override; WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; } diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h index 3b9fa50b4..dd66b1fe7 100644 --- a/src/zenhttp/include/zenhttp/httpserver.h +++ b/src/zenhttp/include/zenhttp/httpserver.h @@ -177,6 +177,7 @@ public: virtual int Initialize(int BasePort) = 0; virtual void Run(bool IsInteractiveSession) = 0; virtual void RequestExit() = 0; + virtual void Close() = 0; }; Ref<HttpServer> CreateHttpServer(std::string_view ServerClass); diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index f041332e5..826924952 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -485,6 +485,7 @@ public: { ZEN_INFO(ZEN_APP_NAME " cleaning up"); m_GcScheduler.Shutdown(); + m_Http->Close(); } void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 1d25920c4..108d37607 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -179,8 +179,31 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN } } - m_LogFlushPosition = ReadIndexFile(); - uint64_t LogEntryCount = ReadLog(m_LogFlushPosition); + if (std::filesystem::is_regular_file(IndexPath)) + { + uint32_t IndexVersion = 0; + m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion); + if (IndexVersion == 0) + { + ZEN_WARN("removing invalid index file at '{}'", IndexPath); + std::filesystem::remove(IndexPath); + } + } + + uint64_t LogEntryCount = 0; + if (std::filesystem::is_regular_file(LogPath)) + { + if (TCasLogFile<FileCasIndexEntry>::IsValid(LogPath)) + { + LogEntryCount = ReadLog(LogPath, m_LogFlushPosition); + } + else + { + ZEN_WARN("removing invalid cas log at '{}'", LogPath); + std::filesystem::remove(LogPath); + } + } + for (const auto& Entry : m_Index) { m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed); @@ -804,16 +827,8 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&& void FileCasStrategy::Flush() { - // Since we don't keep files open after writing there's nothing specific - // to flush here. - // - // Depending on what semantics we want Flush() to provide, it could be - // argued that this should just flush the volume which we are using to - // store the CAS files on here, to ensure metadata is flushed along - // with file data - // - // Related: to facilitate more targeted validation during recovery we could - // maintain a log of when chunks were created + m_CasLog.Flush(); + MakeIndexSnapshot(); } void @@ -1095,12 +1110,11 @@ FileCasStrategy::MakeIndexSnapshot() } } uint64_t -FileCasStrategy::ReadIndexFile() +FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion) { using namespace filecas::impl; std::vector<FileCasIndexEntry> Entries; - std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory); if (std::filesystem::is_regular_file(IndexPath)) { Stopwatch Timer; @@ -1135,7 +1149,7 @@ FileCasStrategy::ReadIndexFile() } m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); } - + OutVersion = FileCasIndexHeader::CurrentVersion; return Header.LogPosition; } else @@ -1167,12 +1181,8 @@ FileCasStrategy::ReadIndexFile() std::string InvalidEntryReason; for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries) { - if (!ValidateEntry(Entry, InvalidEntryReason)) - { - ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason); - continue; - } m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size}); + TotalSize += Entry.Size; CasLog.Append(Entry); } @@ -1183,11 +1193,10 @@ FileCasStrategy::ReadIndexFile() } uint64_t -FileCasStrategy::ReadLog(uint64_t SkipEntryCount) +FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount) { using namespace filecas::impl; - std::filesystem::path LogPath = GetLogPath(m_RootDirectory); if (std::filesystem::is_regular_file(LogPath)) { uint64_t LogEntryCount = 0; diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h index 420b3a634..01d3648da 100644 --- a/src/zenstore/filecas.h +++ b/src/zenstore/filecas.h @@ -46,8 +46,8 @@ struct FileCasStrategy final : public GcStorage private: void MakeIndexSnapshot(); - uint64_t ReadIndexFile(); - uint64_t ReadLog(uint64_t LogPosition); + uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion); + uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition); struct IndexEntry { |