summaryrefslogtreecommitdiff
path: root/public/gcsdk/gcsqlwritequeue.h
blob: a8141d44e9c880e5af649e6fc68c2e97d902afe7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
//====== Copyright �, Valve Corporation, All rights reserved. =======
// GCSqlWriteQueue.h
//
// A utility class that allows for templating based upon a SQL schema type, and then
// queuing up a number of those records to be written to SQL. This will buffer them until
// either a certain number have been queued or a period of time has elapsed, at which point
// it will flush them to SQL in a single transaction.
//
//===================================================================
#ifndef GCSQLWRITEQUEUE_H
#define GCSQLWRITEQUEUE_H

#include "scheduledfunction.h"

namespace GCSDK
{

class CGCBase;

template < typename TSqlClass >
class CGCSQLWriteQueue
{
public:

	CGCSQLWriteQueue( uint32 nMaxToCache, uint32 nMaxMSToWrite ) :
		m_nMaxToCache( nMaxToCache ),
		m_nMaxMSToWrite( nMaxMSToWrite )
	{
		m_QueuedRecords.EnsureCapacity( nMaxToCache );
	}
	
	//called to queue the record for writing, which will occur either when the maximum time between writebacks has occurred, or
	//when the cache has filled up
	void QueueRecord( const TSqlClass& sch )
	{
		m_QueuedRecords.AddToTail( sch );

		//now handle either dispatching, or scheduling a timeout dispatch
		if( ( uint32 )m_QueuedRecords.Count() >= m_nMaxToCache )
		{
			//unschedule first. This way if while we are yielded, we add another entry, it can schedule a new timeout
			m_TimeCommit.Cancel();
			CreateJobToCommitSQL();
		}
		else if( !m_TimeCommit.BIsScheduled() )
		{
			m_TimeCommit.Schedule( this, &CGCSQLWriteQueue< TSqlClass >::CreateJobToCommitSQL, m_nMaxMSToWrite );			
		}
	}

	//a yielding version of the above function
	void YieldingQueueRecord( const TSqlClass& sch )
	{
		m_QueuedRecords.AddToTail( sch );

		//now handle either dispatching, or scheduling a timeout dispatch
		if( ( uint32 )m_QueuedRecords.Count() >= m_nMaxToCache )
		{
			//unschedule first. This way if while we are yielded, we add another entry, it can schedule a new timeout
			m_TimeCommit.Cancel();
			YieldingFlushQueuedViewsToSQL();
		}
		else if( !m_TimeCommit.BIsScheduled() )
		{
			m_TimeCommit.Schedule( this, &CGCSQLWriteQueue< TSqlClass >::CreateJobToCommitSQL, m_nMaxMSToWrite );			
		}
	}
	
private:

	//called internally when we kick off a job after N amount of time has expired
	template < typename TSqlClass >
	class CTimeExpiredCommitJob : public CGCJob
	{
	public:
		CTimeExpiredCommitJob( CGCBase* pGC, CGCSQLWriteQueue< TSqlClass >* pClass ) : CGCJob( pGC ), m_pClass( pClass )	{}
		virtual bool BYieldingRunJob( void* pvStartParm )
		{
			m_pClass->YieldingFlushQueuedViewsToSQL();
			return true;
		}
	private:
		CGCSQLWriteQueue< TSqlClass >*	m_pClass;
	};

	//the function called when time expires to start a job and commit the requests to SQL
	void CreateJobToCommitSQL()
	{
		//kick off our job, which just calls the flush
		CGCJob* pJob = new CTimeExpiredCommitJob< TSqlClass >( GGCBase(), this );
		pJob->StartJobDelayed( NULL );
	}

	//handles committing the list of queued views to SQL
	void YieldingFlushQueuedViewsToSQL()
	{
		if( m_QueuedRecords.Count() == 0 )
			return;

		//move the contents into a local vector so we don't have any conflicts of global state
		CUtlVector< TSqlClass > localQueue;
		localQueue.Swap( m_QueuedRecords );
		//prepare the queue for the next batch (so we don't have intermediate resizes)
		m_QueuedRecords.EnsureCapacity( m_nMaxToCache );

		// start a transaction for all this work
		CSQLAccess sqlAccess;
		sqlAccess.BBeginTransaction( "CGCWatchDownloadedReplayJob::FlushQueuedViewsToSQL" );

		FOR_EACH_VEC( localQueue, nCurrView )
		{
			sqlAccess.BYieldingInsertRecord( &localQueue[ nCurrView ] );
		}

		sqlAccess.BCommitTransaction();
	}

	//the records that we have queued
	CUtlVector< TSqlClass >									m_QueuedRecords;
	//schedules a write back to ensure we commit at least every N seconds
	CScheduledFunction< CGCSQLWriteQueue< TSqlClass > >		m_TimeCommit;
	//maximum number of seconds between commits
	uint32													m_nMaxMSToWrite;
	//maximum number of records to buffer before writing back
	uint32													m_nMaxToCache;
};

} //namespace GCSDK

#endif