diff options
Diffstat (limited to 'engine/net_ws_queued_packet_sender.cpp')
| -rw-r--r-- | engine/net_ws_queued_packet_sender.cpp | 267 |
1 files changed, 267 insertions, 0 deletions
diff --git a/engine/net_ws_queued_packet_sender.cpp b/engine/net_ws_queued_packet_sender.cpp new file mode 100644 index 0000000..40a8108 --- /dev/null +++ b/engine/net_ws_queued_packet_sender.cpp @@ -0,0 +1,267 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +//============================================================================= + +#include "net_ws_headers.h" +#include "net_ws_queued_packet_sender.h" + +#include "tier1/utlvector.h" +#include "tier1/utlpriorityqueue.h" + +#include "tier0/etwprof.h" + +// memdbgon must be the last include file in a .cpp file!!! +#include "tier0/memdbgon.h" + +ConVar net_queued_packet_thread( "net_queued_packet_thread", "1", 0, "Use a high priority thread to send queued packets out instead of sending them each frame." ); +ConVar net_queue_trace( "net_queue_trace", "0", 0 ); + +class CQueuedPacketSender : public CThread, public IQueuedPacketSender +{ +public: + CQueuedPacketSender(); + ~CQueuedPacketSender(); + + // IQueuedPacketSender + + virtual bool Setup(); + virtual void Shutdown(); + virtual bool IsRunning() { return CThread::IsAlive(); } + + virtual void ClearQueuedPacketsForChannel( INetChannel *pChan ); + virtual void QueuePacket( INetChannel *pChan, SOCKET s, const char FAR *buf, int len, const struct sockaddr FAR * to, int tolen, uint32 msecDelay ); + virtual bool HasQueuedPackets( const INetChannel *pChan ) const; +private: + + // CThread Overrides + virtual bool Start( unsigned int nBytesStack = 0 ); + virtual int Run(); + +private: + + class CQueuedPacket + { + public: + uint32 m_unSendTime; + const void *m_pChannel; // We don't actually use the channel + SOCKET m_Socket; + CUtlVector<char> to; // sockaddr + CUtlVector<char> buf; + + // We want the list sorted in ascending order, so note that we return > rather than < + static bool LessFunc( CQueuedPacket * const &lhs, CQueuedPacket * const &rhs ) + { + return lhs->m_unSendTime > rhs->m_unSendTime; + } + }; + + CUtlPriorityQueue< CQueuedPacket * > m_QueuedPackets; + CThreadMutex m_QueuedPacketsCS; + CThreadEvent m_hThreadEvent; + volatile bool m_bThreadShouldExit; +}; + +static CQueuedPacketSender g_QueuedPacketSender; +IQueuedPacketSender *g_pQueuedPackedSender = &g_QueuedPacketSender; + + +CQueuedPacketSender::CQueuedPacketSender() : + m_QueuedPackets( 0, 0, CQueuedPacket::LessFunc ) +{ + SetName( "QueuedPacketSender" ); + m_bThreadShouldExit = false; +} + +CQueuedPacketSender::~CQueuedPacketSender() +{ + Shutdown(); +} + +bool CQueuedPacketSender::Setup() +{ + return Start(); +} + +bool CQueuedPacketSender::Start( unsigned nBytesStack ) +{ + Shutdown(); + + if ( CThread::Start( nBytesStack ) ) + { + // Ahhh the perfect cross-platformness of the threads library. +#ifdef IS_WINDOWS_PC + SetPriority( THREAD_PRIORITY_HIGHEST ); +#elif POSIX + //SetPriority( PRIORITY_MAX ); +#endif + m_bThreadShouldExit = false; + return true; + } + else + { + return false; + } +} + +void CQueuedPacketSender::Shutdown() +{ + if ( !IsAlive() ) + return; + + m_bThreadShouldExit = true; + m_hThreadEvent.Set(); + + Join(); // Wait for the thread to exit. + + while ( m_QueuedPackets.Count() > 0 ) + { + delete m_QueuedPackets.ElementAtHead(); + m_QueuedPackets.RemoveAtHead(); + } + m_QueuedPackets.Purge(); +} + +void CQueuedPacketSender::ClearQueuedPacketsForChannel( INetChannel *pChan ) +{ + AUTO_LOCK( m_QueuedPacketsCS ); + + for ( int i = m_QueuedPackets.Count()-1; i >= 0; i-- ) + { + CQueuedPacket *p = m_QueuedPackets.Element( i ); + if ( p->m_pChannel == pChan ) + { + m_QueuedPackets.RemoveAt( i ); + delete p; + } + } +} + +bool CQueuedPacketSender::HasQueuedPackets( const INetChannel *pChan ) const +{ + AUTO_LOCK( m_QueuedPacketsCS ); + + for ( int i = 0; i < m_QueuedPackets.Count(); ++i ) + { + const CQueuedPacket *p = m_QueuedPackets.Element( i ); + if ( p->m_pChannel == pChan ) + { + return true; + } + } + + return false; +} +void CQueuedPacketSender::QueuePacket( INetChannel *pChan, SOCKET s, const char FAR *buf, int len, const struct sockaddr FAR * to, int tolen, uint32 msecDelay ) +{ + AUTO_LOCK( m_QueuedPacketsCS ); + + // We'll pull all packets we should have sent by now and send them out right away + uint32 msNow = Plat_MSTime(); + + int nMaxQueuedPackets = 1024; + if ( m_QueuedPackets.Count() < nMaxQueuedPackets ) + { + // Add this packet to the queue. + CQueuedPacket *pPacket = new CQueuedPacket; + pPacket->m_unSendTime = msNow + msecDelay; + pPacket->m_Socket = s; + pPacket->m_pChannel = pChan; + pPacket->buf.CopyArray( (char*)buf, len ); + pPacket->to.CopyArray( (char*)to, tolen ); + m_QueuedPackets.Insert( pPacket ); + } + else + { + static int nWarnings = 5; + if ( --nWarnings > 0 ) + { + Warning( "CQueuedPacketSender: num queued packets >= nMaxQueuedPackets. Not queueing anymore.\n" ); + } + } + + // Tell the thread that we have a queued packet. + m_hThreadEvent.Set(); +} + +extern int NET_SendToImpl( SOCKET s, const char FAR * buf, int len, const struct sockaddr FAR * to, int tolen, int iGameDataLength ); + +int CQueuedPacketSender::Run() +{ + // Normally TT_INFINITE but we wakeup every 50ms just in case. + uint32 waitIntervalNoPackets = 50; + uint32 waitInterval = waitIntervalNoPackets; + while ( 1 ) + { + if ( m_hThreadEvent.Wait( waitInterval ) ) + { + // Someone signaled the thread. Either we're being told to exit or + // we're being told that a packet was just queued. + if ( m_bThreadShouldExit ) + return 0; + } + + // Assume nothing to do and that we'll sleep again + waitInterval = waitIntervalNoPackets; + + // OK, now send a packet. + { + AUTO_LOCK( m_QueuedPacketsCS ); + + // We'll pull all packets we should have sent by now and send them out right away + uint32 msNow = Plat_MSTime(); + + bool bTrace = net_queue_trace.GetInt() == NET_QUEUED_PACKET_THREAD_DEBUG_VALUE; + + while ( m_QueuedPackets.Count() > 0 ) + { + CQueuedPacket *pPacket = m_QueuedPackets.ElementAtHead(); + if ( pPacket->m_unSendTime > msNow ) + { + // Sleep until next we need this packet + waitInterval = pPacket->m_unSendTime - msNow; + // Emit ETW events to help with diagnosing network throttling issues as + // these often have a severe effect on load times in Dota. + ETWMark1I( "CQueuedPacketSender::Run sleeping (ms)", waitInterval ); + if ( bTrace ) + { + Warning( "SQ: sleeping for %u msecs at %f\n", waitInterval, Plat_FloatTime() ); + } + break; + } + + // If it's a bot, don't do anything. Note: we DO want this code deep here because bots only + // try to send packets when sv_stressbots is set, in which case we want it to act as closely + // as a real player as possible. + sockaddr_in *pInternetAddr = (sockaddr_in*)pPacket->to.Base(); + #ifdef _WIN32 + if ( pInternetAddr->sin_addr.S_un.S_addr != 0 + #else + if ( pInternetAddr->sin_addr.s_addr != 0 + #endif + && pInternetAddr->sin_port != 0 ) + { + if ( bTrace ) + { + Warning( "SQ: sending %d bytes at %f\n", pPacket->buf.Count(), Plat_FloatTime() ); + } + + NET_SendToImpl + ( + pPacket->m_Socket, + pPacket->buf.Base(), + pPacket->buf.Count(), + (sockaddr*)pPacket->to.Base(), + pPacket->to.Count(), + -1 + ); + } + + delete pPacket; + m_QueuedPackets.RemoveAtHead(); + } + } + } +} + |