diff options
Diffstat (limited to 'utils/vmpi/mysql_async.cpp')
| -rw-r--r-- | utils/vmpi/mysql_async.cpp | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/utils/vmpi/mysql_async.cpp b/utils/vmpi/mysql_async.cpp new file mode 100644 index 0000000..e4721a0 --- /dev/null +++ b/utils/vmpi/mysql_async.cpp @@ -0,0 +1,275 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +//=============================================================================// + +#include <windows.h> +#include "imysqlwrapper.h" +#include "mysql_async.h" +#include "utllinkedlist.h" + + +static char* CopyString( const char *pStr ) +{ + char *pRet = new char[ strlen( pStr ) + 1 ]; + strcpy( pRet, pStr ); + return pRet; +} + + +class CMySQLAsync : public IMySQLAsync +{ +public: + + CMySQLAsync() + { + m_hThread = NULL; + m_pSQL = NULL; + + m_hExitEvent = CreateEvent( NULL, true, false, NULL ); // Use manual reset because we want it to cascade out without + // resetting the event if it gets set. + m_hPendingQueryEvent = CreateEvent( NULL, false, false, NULL ); + m_hQueryResultsEvent = CreateEvent( NULL, false, false, NULL ); + + InitializeCriticalSection( &m_ExecuteQueryCS ); + InitializeCriticalSection( &m_PendingQueryCS ); + } + + ~CMySQLAsync() + { + Term(); + + CloseHandle( m_hExitEvent ); + CloseHandle( m_hPendingQueryEvent ); + CloseHandle( m_hQueryResultsEvent ); + + DeleteCriticalSection( &m_ExecuteQueryCS ); + DeleteCriticalSection( &m_PendingQueryCS ); + } + + virtual void Release() + { + delete this; + } + + virtual IMySQLRowSet* ExecuteBlocking( const char *pStr ) + { + IMySQLRowSet *pRet; + + EnterCriticalSection( &m_ExecuteQueryCS ); + m_pSQL->Execute( pStr ); + pRet = m_pSQL->DuplicateRowSet(); + LeaveCriticalSection( &m_ExecuteQueryCS ); + + return pRet; + } + + virtual void Execute( const char *pStr, void *pUserData ) + { + EnterCriticalSection( &m_PendingQueryCS ); + + CPendingQuery query; + query.m_pStr = CopyString( pStr ); + query.m_pUserData = pUserData; + query.m_Timer.Start(); + + m_PendingQueries.AddToTail( query ); + SetEvent( m_hPendingQueryEvent ); + + LeaveCriticalSection( &m_PendingQueryCS ); + } + + virtual bool GetNextResults( CQueryResults &results ) + { + results.m_pResults = NULL; + + if ( WaitForSingleObject( m_hQueryResultsEvent, 0 ) == WAIT_OBJECT_0 ) + { + EnterCriticalSection( &m_PendingQueryCS ); + + Assert( m_QueryResults.Count() > 0 ); + int iHead = m_QueryResults.Head(); + results = m_QueryResults[iHead]; + m_QueryResults.Remove( iHead ); + + if ( m_QueryResults.Count() > 0 ) + SetEvent( m_hQueryResultsEvent ); + + LeaveCriticalSection( &m_PendingQueryCS ); + return true; + } + else + { + return false; + } + } + + bool Init( IMySQL *pSQL ) + { + Term(); + + DWORD dwThreadID; + m_hThread = CreateThread( NULL, 0, &CMySQLAsync::StaticThreadFn, this, 0, &dwThreadID ); + if ( m_hThread ) + { + m_pSQL = pSQL; + return true; + } + else + { + return false; + } + } + + void Term() + { + // Stop the thread. + if ( m_hThread ) + { + // Delete all our queries. + SetEvent( m_hExitEvent ); + WaitForSingleObject( m_hThread, INFINITE ); + CloseHandle( m_hThread ); + m_hThread = NULL; + } + + // Delete leftover queries. + FOR_EACH_LL( m_PendingQueries, iPendingQuery ) + { + delete [] m_PendingQueries[iPendingQuery].m_pStr; + } + m_PendingQueries.Purge(); + + FOR_EACH_LL( m_QueryResults, i ) + { + m_QueryResults[i].m_pResults->Release(); + } + m_QueryResults.Purge(); + + if ( m_pSQL ) + { + m_pSQL->Release(); + m_pSQL = NULL; + } + } + + +private: + + DWORD ThreadFn() + { + HANDLE hEvents[2] = { m_hExitEvent, m_hPendingQueryEvent }; + + // + while ( 1 ) + { + int ret = WaitForMultipleObjects( ARRAYSIZE( hEvents ), hEvents, false, INFINITE ); + if ( ret == WAIT_OBJECT_0 ) + break; + + if ( ret == WAIT_OBJECT_0+1 ) + { + // A new string has been queued up for us to execute. + EnterCriticalSection( &m_PendingQueryCS ); + + Assert( m_PendingQueries.Count() > 0 ); + int iHead = m_PendingQueries.Head(); + + CPendingQuery pending = m_PendingQueries[iHead]; + m_PendingQueries.Remove( iHead ); + + // Set the pending query event if there are more queries waiting to run. + if ( m_PendingQueries.Count() > 0 ) + SetEvent( m_hPendingQueryEvent ); + + LeaveCriticalSection( &m_PendingQueryCS ); + + + // Run the query. + EnterCriticalSection( &m_ExecuteQueryCS ); + + CQueryResults results; + results.m_pResults = NULL; + results.m_pUserData = pending.m_pUserData; + results.m_ExecuteTime.Init(); + pending.m_Timer.End(); + results.m_QueueTime = pending.m_Timer.GetDuration(); + + CFastTimer executeTimer; + executeTimer.Start(); + + if ( m_pSQL->Execute( pending.m_pStr ) == 0 ) + { + executeTimer.End(); + results.m_ExecuteTime = executeTimer.GetDuration(); + results.m_pResults = m_pSQL->DuplicateRowSet(); + } + + delete pending.m_pStr; + + LeaveCriticalSection( &m_ExecuteQueryCS ); + + + // Store the results. + EnterCriticalSection( &m_PendingQueryCS ); + + m_QueryResults.AddToTail( results ); + SetEvent( m_hQueryResultsEvent ); + + LeaveCriticalSection( &m_PendingQueryCS ); + } + } + + return 0; + } + + static DWORD WINAPI StaticThreadFn( LPVOID lpParameter ) + { + return ((CMySQLAsync*)lpParameter)->ThreadFn(); + } + +private: + + HANDLE m_hThread; + HANDLE m_hExitEvent; + HANDLE m_hPendingQueryEvent; // Signaled when a new query is added. + HANDLE m_hQueryResultsEvent; + + IMySQL *m_pSQL; + + CRITICAL_SECTION m_PendingQueryCS; + CRITICAL_SECTION m_ExecuteQueryCS; + + + // Outgoing query results. New ones are added to the tail. + CUtlLinkedList<CQueryResults, int> m_QueryResults; + + + // New ones added to the tail. + class CPendingQuery + { + public: + char *m_pStr; + void *m_pUserData; + CFastTimer m_Timer; // Times how long this query is in the queue. + }; + + CUtlLinkedList<CPendingQuery,int> m_PendingQueries; +}; + + +IMySQLAsync* CreateMySQLAsync( IMySQL *pSQL ) +{ + CMySQLAsync *pRet = new CMySQLAsync; + if ( pRet->Init( pSQL ) ) + { + return pRet; + } + else + { + delete pRet; + return NULL; + } +} + |