summaryrefslogtreecommitdiff
path: root/replay/sv_sessionblockpublisher.cpp
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /replay/sv_sessionblockpublisher.cpp
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'replay/sv_sessionblockpublisher.cpp')
-rw-r--r--replay/sv_sessionblockpublisher.cpp455
1 files changed, 455 insertions, 0 deletions
diff --git a/replay/sv_sessionblockpublisher.cpp b/replay/sv_sessionblockpublisher.cpp
new file mode 100644
index 0000000..d00c309
--- /dev/null
+++ b/replay/sv_sessionblockpublisher.cpp
@@ -0,0 +1,455 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+//=======================================================================================//
+
+#include "sv_sessionblockpublisher.h"
+#include "sv_replaycontext.h"
+#include "demofile/demoformat.h"
+#include "sv_recordingsession.h"
+#include "sv_recordingsessionblock.h"
+#include "sv_sessioninfopublisher.h"
+
+// memdbgon must be the last include file in a .cpp file!!!
+#include "tier0/memdbgon.h"
+
+//----------------------------------------------------------------------------------------
+
+CSessionBlockPublisher::CSessionBlockPublisher( CServerRecordingSession *pSession,
+ CSessionInfoPublisher *pSessionInfoPublisher )
+: m_pSession( pSession ),
+ m_pSessionInfoPublisher( pSessionInfoPublisher )
+{
+ // Cache the dump interval so it can't be modified during a round - doing so would require
+ // an update on all clients.
+ extern ConVar replay_block_dump_interval;
+ m_nDumpInterval = MAX( MIN_SERVER_DUMP_INTERVAL, replay_block_dump_interval.GetInt() );
+
+ // Write the first block 15 or so seconds from now
+ m_flLastBlockWriteTime = g_pEngine->GetHostTime();
+}
+
+CSessionBlockPublisher::~CSessionBlockPublisher()
+{
+}
+
+void CSessionBlockPublisher::PublishAllSynchronous()
+{
+ while ( !IsDone() )
+ {
+ Think();
+ }
+}
+
+void CSessionBlockPublisher::AbortPublish()
+{
+ FOR_EACH_LL( m_lstPublishingBlocks, it )
+ {
+ CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
+ IFilePublisher *&pPublisher = pCurBlock->m_pPublisher; // Shorthand
+
+ if ( !pPublisher )
+ continue;
+
+ // Already done?
+ if ( pPublisher->IsDone() )
+ continue;
+
+ pPublisher->AbortAndCleanup();
+ }
+
+ // Remove all blocks
+ m_lstPublishingBlocks.RemoveAll();
+}
+
+void CSessionBlockPublisher::OnStartRecording()
+{
+}
+
+void CSessionBlockPublisher::OnStopRecord( bool bAborting )
+{
+ if ( !bAborting )
+ {
+ // Write one final session block.
+ WriteAndPublishSessionBlock();
+ }
+}
+
+ReplayHandle_t CSessionBlockPublisher::GetSessionHandle() const
+{
+ return m_pSession->GetHandle();
+}
+
+void CSessionBlockPublisher::WriteAndPublishSessionBlock()
+{
+ // Make sure there is data to write
+ uint8 *pSessionBuffer;
+ int nSessionBufferSize;
+ g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize ); // This will get called the last client disconnects from the server - but in waiting for players state we won't have a demo buffer
+ if ( !pSessionBuffer || nSessionBufferSize == 0 )
+ return;
+
+ // Create a new block
+ CServerRecordingSessionBlock *pNewBlock = SV_CastBlock( SV_GetRecordingSessionBlockManager()->CreateAndGenerateHandle() );
+ if ( !pNewBlock )
+ {
+ Warning( "Failed to create replay \"%s\"\n", pNewBlock->m_szFullFilename );
+ delete pNewBlock;
+ return;
+ }
+
+ if ( m_pSession->m_nServerStartRecordTick < 0 )
+ {
+ Warning( "Error: Current recording start tick was not properly setup. Aborting block write.\n" );
+ delete pNewBlock;
+ return;
+ }
+
+ // Figure out what the current block is
+ const int iCurrentSessionBlock = m_pSession->GetNumBlocks();
+
+ // Add an entry to the server index with the "writing" status set
+ const char *pFullFilename = Replay_va(
+ "%s%s_part_%u.%s", SV_GetTmpDir(),
+ SV_GetRecordingSessionManager()->GetCurrentSessionName(), iCurrentSessionBlock, BLOCK_FILE_EXTENSION
+ );
+ V_strcpy( pNewBlock->m_szFullFilename, pFullFilename );
+ pNewBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_INVALID; // Must be set here to trigger write
+ pNewBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_WRITING;
+ pNewBlock->m_iReconstruction = iCurrentSessionBlock;
+ pNewBlock->m_hSession = m_pSession->GetHandle();
+
+ // Match the session's lock - the block will be unlocked once recording has stopped and all publishing is complete.
+ pNewBlock->SetLocked( true );
+
+ // Commit the replay to the history manager's list
+ SV_GetRecordingSessionBlockManager()->Add( pNewBlock );
+
+ // Also store a pointer to the block in the session - NOTE: session will not attempt to free this pointer
+ m_pSession->AddBlock( pNewBlock, false );
+
+ // Cache the block temporarily while the binary block itself writes to disk - NOTE: will not attempt to free
+ m_lstPublishingBlocks.AddToTail( pNewBlock );
+
+ // Write the block now
+ PublishBlock( pNewBlock ); // pNewBlock->m_nWriteStatus modified here
+
+ IF_REPLAY_DBG( Warning( "%f: (%i) Publishing new block, %s\n", g_pEngine->GetHostTime(), iCurrentSessionBlock, pNewBlock->GetFilename() ) );
+}
+
+void CSessionBlockPublisher::GatherBlockData( uint8 *pSessionBuffer, int nSessionBufferSize, CServerRecordingSessionBlock *pBlock, unsigned char **ppSafeBlockData, int *pBlockSize )
+{
+ const int nHeaderSize = sizeof( demoheader_t );
+
+ int nBlockOffset = 0;
+ const int nBlockSize = nSessionBufferSize;
+ int nTotalSize = nBlockSize;
+
+ demoheader_t *pHeader = NULL;
+
+ // If this is the first block, pass in a header to be written. Otherwise, just write the block.
+ if ( !pBlock->m_iReconstruction )
+ {
+ // Setup start tick in the header
+ pHeader = g_pEngine->GetReplayDemoHeader();
+
+ // Add header size
+ nBlockOffset = nHeaderSize;
+ nTotalSize += nHeaderSize;
+ }
+
+ // Make a copy of the block
+ unsigned char *pBuffer = new unsigned char[ nTotalSize ];
+ unsigned char *pBlockCopy = pBuffer + nBlockOffset;
+
+ // Only write the header if necessary
+ if ( pHeader )
+ {
+ demoheader_t littleEndianHeader = *pHeader;
+ littleEndianHeader.playback_time = FLT_MAX;
+ littleEndianHeader.playback_ticks = INT_MAX;
+ littleEndianHeader.playback_frames = INT_MAX;
+
+ // Byteswap
+ ByteSwap_demoheader_t( littleEndianHeader );
+
+ // Write header
+ V_memcpy( pBuffer, &littleEndianHeader, sizeof( littleEndianHeader ) );
+ }
+
+ // Note that pBlockCopy is based on pBuffer, which was allocated with nBlockSize PLUS
+ // header size - this will not overflow.
+ V_memcpy( pBlockCopy, pSessionBuffer, nBlockSize );
+
+ // Copy to "out" parameters
+ *pBlockSize = nTotalSize;
+ *ppSafeBlockData = pBuffer;
+}
+
+void CSessionBlockPublisher::PublishBlock( CServerRecordingSessionBlock *pBlock )
+{
+ uint8 *pSessionBuffer;
+ int nSessionBufferSize;
+ if ( !g_pEngine->GetSessionRecordBuffer( &pSessionBuffer, &nSessionBufferSize ) )
+ {
+ Warning( "Block publish failed!\n" );
+ return;
+ }
+
+ unsigned char *pSafeBlockData;
+ int nBlockSize;
+ GatherBlockData( pSessionBuffer, nSessionBufferSize, pBlock, &pSafeBlockData, &nBlockSize );
+
+ // We've got what we need and can reset the put ptr
+ g_pEngine->ResetReplayRecordBuffer();
+
+ AssertMsg( !pBlock->m_pPublisher, "No publisher should exist for this block yet!" );
+
+ // Set status to working
+ pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_WORKING;
+
+ // Get the number of bytes written
+ pBlock->m_uFileSize = nBlockSize;
+
+ // Make sure the main thread doesn't unload the block while it's being published
+ pBlock->SetLocked( true );
+
+ // Asynchronously publish to fileserver
+ PublishFileParams_t params;
+ params.m_pOutFilename = pBlock->m_szFullFilename;
+ params.m_pSrcData = pSafeBlockData;
+ params.m_nSrcSize = nBlockSize;
+ params.m_pCallbackHandler = this;
+ params.m_nCompressorType = COMPRESSORTYPE_BZ2;
+ params.m_bHash = true;
+ params.m_bFreeSrcData = true;
+ params.m_bDeleteFile = false;
+ params.m_pUserData = pBlock;
+ pBlock->m_pPublisher = SV_PublishFile( params );
+}
+
+void CSessionBlockPublisher::OnPublishComplete( const IFilePublisher *pPublisher, void *pUserData )
+{
+ CServerRecordingSessionBlock *pBlock = (CServerRecordingSessionBlock *)pUserData;
+ Assert( pBlock );
+
+ // Set block status
+ if ( pPublisher->GetStatus() == IFilePublisher::PUBLISHSTATUS_OK )
+ {
+ pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_SUCCESS;
+ }
+ else
+ {
+ pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;
+
+ // Publish failed - handle as needed
+ g_pServerReplayContext->OnPublishFailed();
+ }
+
+ // Did the block compress OK?
+ if ( pPublisher->Compressed() )
+ {
+ // Cache compressor type
+ pBlock->m_nCompressorType = pPublisher->GetCompressorType();
+
+ const int nCompressedSize = pPublisher->GetCompressedSize();
+ const float flRatio = (float)pBlock->m_uFileSize / nCompressedSize;
+ IF_REPLAY_DBG( Warning( "Block compression ratio: %.3f:1\n", flRatio ) );
+
+ // Update size
+ pBlock->m_uUncompressedSize = pBlock->m_uFileSize;
+ pBlock->m_uFileSize = nCompressedSize;
+ }
+
+ // Get the MD5
+ if ( pPublisher->Hashed() )
+ {
+ pPublisher->GetHash( pBlock->m_aHash );
+ }
+
+ // Now that m_nWriteStatus has been set in the block, the session info will be updated
+ // accordingly the next time PublishThink() is run.
+
+ // Mark the block as dirty since it was modified
+ Assert( pBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
+ SV_GetRecordingSessionBlockManager()->FlagForFlush( pBlock, false );
+
+ IF_REPLAY_DBG( Warning( "Publish complete for block %s\n", pBlock->GetDebugName() ) );
+}
+
+void CSessionBlockPublisher::OnPublishAborted( const IFilePublisher *pPublisher )
+{
+ CServerRecordingSessionBlock *pBlock = FindBlockFromPublisher( pPublisher );
+
+ // Update the block's status
+ if ( pBlock )
+ {
+ pBlock->m_nWriteStatus = CServerRecordingSessionBlock::WRITESTATUS_FAILED;
+ }
+
+ g_pServerReplayContext->OnPublishFailed();
+}
+
+CServerRecordingSessionBlock *CSessionBlockPublisher::FindBlockFromPublisher( const IFilePublisher *pPublisher )
+{
+ FOR_EACH_LL( m_lstPublishingBlocks, i )
+ {
+ CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
+ if ( pCurBlock->m_pPublisher == pPublisher )
+ {
+ return pCurBlock;
+ }
+ }
+
+ AssertMsg( 0, "Could not find block with the given publisher!" );
+ return NULL;
+}
+
+void CSessionBlockPublisher::Think()
+{
+ // NOTE: This member function gets called even if replay is disabled. This is intentional.
+
+ VPROF_BUDGET( "CSessionBlockPublisher::Think", VPROF_BUDGETGROUP_REPLAY );
+
+ PublishThink();
+}
+
+void CSessionBlockPublisher::PublishThink()
+{
+ AssertMsg( m_pSession->IsLocked(), "The session isn't locked, which means blocks can be being deleted and will probably cause a crash." );
+
+ // Go through all currently publishing blocks and free/think
+ FOR_EACH_LL( m_lstPublishingBlocks, it )
+ {
+ CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
+ IFilePublisher *&pPublisher = pCurBlock->m_pPublisher; // Shorthand
+
+ if ( !pPublisher )
+ continue;
+
+ // If the publisher's done, free it
+ if ( pPublisher->IsDone() )
+ {
+ delete pPublisher;
+ pPublisher = NULL;
+ }
+ else
+ {
+ // Let the publisher think
+ pPublisher->Think();
+ }
+ }
+
+ // Write a new session block out right now?
+ float flHostTime = g_pEngine->GetHostTime();
+ if ( m_flLastBlockWriteTime != 0.0f &&
+ flHostTime - m_flLastBlockWriteTime >= m_nDumpInterval &&
+ m_pSession->m_bRecording )
+ {
+ Assert( m_nDumpInterval > 0 );
+
+ // Write it
+ WriteAndPublishSessionBlock();
+
+ // Update the time
+ m_flLastBlockWriteTime = flHostTime;
+ }
+
+ // Check status of any replays that are being written
+ bool bUpdateSessionInfo = false;
+ for( int it = m_lstPublishingBlocks.Head(); it != m_lstPublishingBlocks.InvalidIndex(); )
+ {
+ CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ it ];
+
+ // Updated when write status is set to success or failure
+ int nPendingRequestStatus = CBaseRecordingSessionBlock::STATUS_INVALID;
+
+ // If set to anything besides InvalidIndex(), it will be removed from the list
+ int itRemove = m_lstPublishingBlocks.InvalidIndex();
+ bool bWriteBlockInfoToDisk = false;
+
+ switch ( pCurBlock->m_nWriteStatus )
+ {
+ case CServerRecordingSessionBlock::WRITESTATUS_INVALID:
+ AssertMsg( 0, "Why is m_nWriteStatus WRITESTATUS_INVALID here?" );
+ break;
+
+ case CServerRecordingSessionBlock::WRITESTATUS_WORKING: // Do nothing if still writing
+ break;
+
+ case CServerRecordingSessionBlock::WRITESTATUS_SUCCESS:
+ IF_REPLAY_DBG2( Warning( " Block %i marked as succeeded.\n", pCurBlock->m_iReconstruction ) );
+ pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD;
+ nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
+ bWriteBlockInfoToDisk = true;
+ itRemove = it;
+ break;
+
+ case CServerRecordingSessionBlock::WRITESTATUS_FAILED:
+ default: // Error?
+ IF_REPLAY_DBG2( Warning( " Block %i marked as failed.\n", pCurBlock->m_iReconstruction ) );
+ pCurBlock->m_nRemoteStatus = CBaseRecordingSessionBlock::STATUS_ERROR;
+ pCurBlock->m_nHttpError = CBaseRecordingSessionBlock::ERROR_WRITEFAILED;
+ nPendingRequestStatus = pCurBlock->m_nRemoteStatus;
+ bWriteBlockInfoToDisk = true;
+ itRemove = it;
+
+ // TODO: Retry
+ }
+
+ if ( bWriteBlockInfoToDisk )
+ {
+ // Save the master index file
+ Assert( pCurBlock->m_nWriteStatus != CServerRecordingSessionBlock::WRITESTATUS_INVALID );
+ SV_GetRecordingSessionBlockManager()->FlagForFlush( pCurBlock, false );
+ }
+
+ // Find the owning session
+ Assert( pCurBlock->m_hSession == m_pSession->GetHandle() );
+
+ // Refresh session info file
+ if ( nPendingRequestStatus != CBaseRecordingSessionBlock::STATUS_INVALID )
+ {
+ // Update it after this loop
+ bUpdateSessionInfo = true;
+ }
+
+ // Update iterator
+ it = m_lstPublishingBlocks.Next( it );
+
+ // Remove?
+ if ( itRemove != m_lstPublishingBlocks.InvalidIndex() )
+ {
+ IF_REPLAY_DBG( Warning( "Removing block %i from publisher\n", pCurBlock->m_iReconstruction ) );
+ // Free/clear publisher
+ delete pCurBlock->m_pPublisher;
+ pCurBlock->m_pPublisher = NULL;
+
+ // Removes from the list but doesn't free, since any pointer here points to a block somewhere
+ m_lstPublishingBlocks.Unlink( itRemove );
+ }
+ }
+
+ // Publish session info file now if it isn't already publishing
+ if ( bUpdateSessionInfo )
+ {
+ m_pSessionInfoPublisher->Publish();
+ }
+}
+
+bool CSessionBlockPublisher::IsDone() const
+{
+ return m_lstPublishingBlocks.Count() == 0;
+}
+
+#ifdef _DEBUG
+void CSessionBlockPublisher::Validate()
+{
+ FOR_EACH_LL( m_lstPublishingBlocks, i )
+ {
+ CServerRecordingSessionBlock *pCurBlock = m_lstPublishingBlocks[ i ];
+ Assert( pCurBlock->m_nRemoteStatus == CBaseRecordingSessionBlock::STATUS_READYFORDOWNLOAD );
+ }
+}
+#endif
+
+//----------------------------------------------------------------------------------------