aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-05-11 13:37:33 +0200
committerStefan Boberg <[email protected]>2023-05-11 13:37:33 +0200
commit964d1bae6ece1b026e443967139ed18fa08fd9fe (patch)
tree48041b36083fbb712b649f12b6197ecd17dec62e /src
parentadded scrubcontext.cpp (diff)
parentv0.2.10 (diff)
downloadzen-964d1bae6ece1b026e443967139ed18fa08fd9fe.tar.xz
zen-964d1bae6ece1b026e443967139ed18fa08fd9fe.zip
Merge branch 'main' of https://github.com/EpicGames/zen
Diffstat (limited to 'src')
-rw-r--r--src/zenhttp/httpasio.cpp58
-rw-r--r--src/zenhttp/httpasio.h1
-rw-r--r--src/zenhttp/httpnull.cpp5
-rw-r--r--src/zenhttp/httpnull.h1
-rw-r--r--src/zenhttp/httpsys.cpp10
-rw-r--r--src/zenhttp/httpsys.h1
-rw-r--r--src/zenhttp/include/zenhttp/httpserver.h1
-rw-r--r--src/zenserver/zenserver.cpp1
-rw-r--r--src/zenstore/filecas.cpp53
-rw-r--r--src/zenstore/filecas.h4
10 files changed, 101 insertions, 34 deletions
diff --git a/src/zenhttp/httpasio.cpp b/src/zenhttp/httpasio.cpp
index 79b2c0a3d..7149caf28 100644
--- a/src/zenhttp/httpasio.cpp
+++ b/src/zenhttp/httpasio.cpp
@@ -329,7 +329,7 @@ private:
void EnqueueRead();
void OnDataReceived(const asio::error_code& Ec, std::size_t ByteCount);
void OnResponseDataSent(const asio::error_code& Ec, std::size_t ByteCount, bool Pop = false);
- void OnError();
+ void CloseConnection();
HttpAsioServerImpl& m_Server;
asio::streambuf m_RequestBuffer;
@@ -368,9 +368,17 @@ HttpServerConnection::HandleNewRequest()
void
HttpServerConnection::TerminateConnection()
{
+ if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated)
+ {
+ return;
+ }
+
m_RequestState = RequestState::kTerminated;
+ ZEN_ASSERT(m_Socket);
+ // Terminating, we don't care about any errors when closing socket
std::error_code Ec;
+ m_Socket->shutdown(asio::socket_base::shutdown_both, Ec);
m_Socket->close(Ec);
}
@@ -407,7 +415,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
else
{
ZEN_WARN("on data received ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
- return OnError();
+ return TerminateConnection();
}
}
@@ -424,7 +432,7 @@ HttpServerConnection::OnDataReceived(const asio::error_code& Ec, [[maybe_unused]
size_t Result = m_RequestData.ConsumeData((const char*)InputBuffer.data(), InputBuffer.size());
if (Result == ~0ull)
{
- return OnError();
+ return TerminateConnection();
}
m_RequestBuffer.consume(Result);
@@ -449,7 +457,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu
if (Ec)
{
ZEN_WARN("on data sent ERROR, connection '{}' reason '{}'", m_ConnectionId, Ec.message());
- OnError();
+ TerminateConnection();
}
else
{
@@ -461,9 +469,7 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu
if (!m_RequestData.IsKeepAlive())
{
- m_RequestState = RequestState::kDone;
-
- m_Socket->close();
+ CloseConnection();
}
else
{
@@ -479,9 +485,26 @@ HttpServerConnection::OnResponseDataSent(const asio::error_code& Ec, [[maybe_unu
}
void
-HttpServerConnection::OnError()
+HttpServerConnection::CloseConnection()
{
- m_Socket->close();
+ if (m_RequestState == RequestState::kDone || m_RequestState == RequestState::kTerminated)
+ {
+ return;
+ }
+ ZEN_ASSERT(m_Socket);
+ m_RequestState = RequestState::kDone;
+
+ std::error_code Ec;
+ m_Socket->shutdown(asio::socket_base::shutdown_receive, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("socket shutdown ERROR, reason '{}'", Ec.message());
+ }
+ m_Socket->close(Ec);
+ if (Ec)
+ {
+ ZEN_WARN("socket close ERROR, reason '{}'", Ec.message());
+ }
}
void
@@ -1053,7 +1076,12 @@ struct HttpAcceptor
}
else
{
- m_Acceptor.close();
+ std::error_code CloseEc;
+ m_Acceptor.close(CloseEc);
+ if (CloseEc)
+ {
+ ZEN_WARN("acceptor close ERROR, reason '{}'", CloseEc.message());
+ }
}
});
}
@@ -1296,6 +1324,15 @@ HttpAsioServer::HttpAsioServer() : m_Impl(std::make_unique<asio_http::HttpAsioSe
HttpAsioServer::~HttpAsioServer()
{
+ if (m_Impl)
+ {
+ ZEN_ERROR("~HttpAsioServer() called without calling Close() first");
+ }
+}
+
+void
+HttpAsioServer::Close()
+{
try
{
m_Impl->Stop();
@@ -1304,6 +1341,7 @@ HttpAsioServer::~HttpAsioServer()
{
ZEN_WARN("Caught exception stopping http asio server: {}", ex.what());
}
+ m_Impl.reset();
}
void
diff --git a/src/zenhttp/httpasio.h b/src/zenhttp/httpasio.h
index 716145955..de25c538f 100644
--- a/src/zenhttp/httpasio.h
+++ b/src/zenhttp/httpasio.h
@@ -25,6 +25,7 @@ public:
virtual int Initialize(int BasePort) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
+ virtual void Close() override;
private:
Event m_ShutdownEvent;
diff --git a/src/zenhttp/httpnull.cpp b/src/zenhttp/httpnull.cpp
index a6e1d3567..658f51831 100644
--- a/src/zenhttp/httpnull.cpp
+++ b/src/zenhttp/httpnull.cpp
@@ -80,4 +80,9 @@ HttpNullServer::RequestExit()
m_ShutdownEvent.Set();
}
+void
+HttpNullServer::Close()
+{
+}
+
} // namespace zen
diff --git a/src/zenhttp/httpnull.h b/src/zenhttp/httpnull.h
index 74f021f6b..965e729f7 100644
--- a/src/zenhttp/httpnull.h
+++ b/src/zenhttp/httpnull.h
@@ -21,6 +21,7 @@ public:
virtual int Initialize(int BasePort) override;
virtual void Run(bool IsInteractiveSession) override;
virtual void RequestExit() override;
+ virtual void Close() override;
private:
Event m_ShutdownEvent;
diff --git a/src/zenhttp/httpsys.cpp b/src/zenhttp/httpsys.cpp
index c733d618d..25e4393b3 100644
--- a/src/zenhttp/httpsys.cpp
+++ b/src/zenhttp/httpsys.cpp
@@ -741,9 +741,19 @@ HttpSysServer::~HttpSysServer()
{
if (m_IsHttpInitialized)
{
+ ZEN_ERROR("~HttpSysServer() called without calling Close() first");
+ }
+}
+
+void
+HttpSysServer::Close()
+{
+ if (m_IsHttpInitialized)
+ {
Cleanup();
HttpTerminate(HTTP_INITIALIZE_SERVER, nullptr);
+ m_IsHttpInitialized = false;
}
}
diff --git a/src/zenhttp/httpsys.h b/src/zenhttp/httpsys.h
index d6bd34890..cf16042d7 100644
--- a/src/zenhttp/httpsys.h
+++ b/src/zenhttp/httpsys.h
@@ -45,6 +45,7 @@ public:
virtual void Run(bool TestMode) override;
virtual void RequestExit() override;
virtual void RegisterService(HttpService& Service) override;
+ virtual void Close() override;
WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; }
diff --git a/src/zenhttp/include/zenhttp/httpserver.h b/src/zenhttp/include/zenhttp/httpserver.h
index 3b9fa50b4..dd66b1fe7 100644
--- a/src/zenhttp/include/zenhttp/httpserver.h
+++ b/src/zenhttp/include/zenhttp/httpserver.h
@@ -177,6 +177,7 @@ public:
virtual int Initialize(int BasePort) = 0;
virtual void Run(bool IsInteractiveSession) = 0;
virtual void RequestExit() = 0;
+ virtual void Close() = 0;
};
Ref<HttpServer> CreateHttpServer(std::string_view ServerClass);
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index f041332e5..826924952 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -485,6 +485,7 @@ public:
{
ZEN_INFO(ZEN_APP_NAME " cleaning up");
m_GcScheduler.Shutdown();
+ m_Http->Close();
}
void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; }
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 1d25920c4..108d37607 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -179,8 +179,31 @@ FileCasStrategy::Initialize(const std::filesystem::path& RootDirectory, bool IsN
}
}
- m_LogFlushPosition = ReadIndexFile();
- uint64_t LogEntryCount = ReadLog(m_LogFlushPosition);
+ if (std::filesystem::is_regular_file(IndexPath))
+ {
+ uint32_t IndexVersion = 0;
+ m_LogFlushPosition = ReadIndexFile(IndexPath, IndexVersion);
+ if (IndexVersion == 0)
+ {
+ ZEN_WARN("removing invalid index file at '{}'", IndexPath);
+ std::filesystem::remove(IndexPath);
+ }
+ }
+
+ uint64_t LogEntryCount = 0;
+ if (std::filesystem::is_regular_file(LogPath))
+ {
+ if (TCasLogFile<FileCasIndexEntry>::IsValid(LogPath))
+ {
+ LogEntryCount = ReadLog(LogPath, m_LogFlushPosition);
+ }
+ else
+ {
+ ZEN_WARN("removing invalid cas log at '{}'", LogPath);
+ std::filesystem::remove(LogPath);
+ }
+ }
+
for (const auto& Entry : m_Index)
{
m_TotalSize.fetch_add(Entry.second.Size, std::memory_order::relaxed);
@@ -804,16 +827,8 @@ FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, IoBuffer&&
void
FileCasStrategy::Flush()
{
- // Since we don't keep files open after writing there's nothing specific
- // to flush here.
- //
- // Depending on what semantics we want Flush() to provide, it could be
- // argued that this should just flush the volume which we are using to
- // store the CAS files on here, to ensure metadata is flushed along
- // with file data
- //
- // Related: to facilitate more targeted validation during recovery we could
- // maintain a log of when chunks were created
+ m_CasLog.Flush();
+ MakeIndexSnapshot();
}
void
@@ -1095,12 +1110,11 @@ FileCasStrategy::MakeIndexSnapshot()
}
}
uint64_t
-FileCasStrategy::ReadIndexFile()
+FileCasStrategy::ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion)
{
using namespace filecas::impl;
std::vector<FileCasIndexEntry> Entries;
- std::filesystem::path IndexPath = GetIndexPath(m_RootDirectory);
if (std::filesystem::is_regular_file(IndexPath))
{
Stopwatch Timer;
@@ -1135,7 +1149,7 @@ FileCasStrategy::ReadIndexFile()
}
m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
}
-
+ OutVersion = FileCasIndexHeader::CurrentVersion;
return Header.LogPosition;
}
else
@@ -1167,12 +1181,8 @@ FileCasStrategy::ReadIndexFile()
std::string InvalidEntryReason;
for (const FileCasStrategy::FileCasIndexEntry& Entry : ScannedEntries)
{
- if (!ValidateEntry(Entry, InvalidEntryReason))
- {
- ZEN_WARN("skipping invalid entry in '{}', reason: '{}'", m_RootDirectory, InvalidEntryReason);
- continue;
- }
m_Index.insert_or_assign(Entry.Key, IndexEntry{.Size = Entry.Size});
+ TotalSize += Entry.Size;
CasLog.Append(Entry);
}
@@ -1183,11 +1193,10 @@ FileCasStrategy::ReadIndexFile()
}
uint64_t
-FileCasStrategy::ReadLog(uint64_t SkipEntryCount)
+FileCasStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t SkipEntryCount)
{
using namespace filecas::impl;
- std::filesystem::path LogPath = GetLogPath(m_RootDirectory);
if (std::filesystem::is_regular_file(LogPath))
{
uint64_t LogEntryCount = 0;
diff --git a/src/zenstore/filecas.h b/src/zenstore/filecas.h
index 420b3a634..01d3648da 100644
--- a/src/zenstore/filecas.h
+++ b/src/zenstore/filecas.h
@@ -46,8 +46,8 @@ struct FileCasStrategy final : public GcStorage
private:
void MakeIndexSnapshot();
- uint64_t ReadIndexFile();
- uint64_t ReadLog(uint64_t LogPosition);
+ uint64_t ReadIndexFile(const std::filesystem::path& IndexPath, uint32_t& OutVersion);
+ uint64_t ReadLog(const std::filesystem::path& LogPath, uint64_t LogPosition);
struct IndexEntry
{