1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
|
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
//=============================================================================//
#include <winsock2.h>
#include "vmpi_filesystem_internal.h"
#include "threadhelpers.h"
#include "zlib.h"
#define NUM_BUFFERED_CHUNK_ACKS 512
#define ACK_FLUSH_INTERVAL 500 // Flush the ack queue twice per second.
static bool g_bReceivedMulticastIP = false;
static CIPAddr g_MulticastIP;
CCriticalSection g_FileResponsesCS;
class CFileResponse
{
public:
int m_RequestID;
int m_Response;
bool m_bZeroLength;
};
CUtlVector<CFileResponse> g_FileResponses;
int g_RequestID = 0;
class CFileChunkPacket
{
public:
int m_Len;
char m_Data[1];
};
CUtlLinkedList<CFileChunkPacket*, int> g_FileChunkPackets; // This is also protected by g_FileResponsesCS.
// ------------------------------------------------------------------------------------------------------------------------ //
// Classes.
// ------------------------------------------------------------------------------------------------------------------------ //
class CWorkerFile
{
public:
const char* GetFilename() { return m_Filename.Base(); }
const char* GetPathID() { return m_PathID.Base(); }
bool IsReadyToRead() const { return m_nChunksToReceive == 0; }
public:
CFastTimer m_Timer; // To see how long it takes to download the file.
// This has to be sent explicitly as part of the file info or else the protocol
// breaks on empty files.
bool m_bZeroLength;
// This is false until we get any packets about the file. In the packets,
// we find out what the size is supposed to be.
bool m_bGotCompressedSize;
// The ID the master uses to refer to this file.
int m_FileID;
CUtlVector<char> m_Filename;
CUtlVector<char> m_PathID;
// First data comes in here, then when it's all there, it is inflated into m_UncompressedData.
CUtlVector<char> m_CompressedData;
// 1 bit for each chunk.
CUtlVector<unsigned char> m_ChunksReceived;
// When this is zero, the file is done being received and m_UncompressedData is valid.
int m_nChunksToReceive;
CUtlVector<char> m_UncompressedData;
};
// ------------------------------------------------------------------------------------------------------------------------ //
// Global helpers.
// ------------------------------------------------------------------------------------------------------------------------ //
static void RecvMulticastIP( CIPAddr *pAddr )
{
while ( !g_bReceivedMulticastIP )
VMPI_DispatchNextMessage();
*pAddr = g_MulticastIP;
}
static bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen )
{
if ( inputLen == 0 )
{
// Zero-length file?
return true;
}
z_stream decompressStream;
// Initialize the decompression stream.
memset( &decompressStream, 0, sizeof( decompressStream ) );
if ( inflateInit( &decompressStream ) != Z_OK )
return false;
// Decompress all this stuff and write it to the file.
decompressStream.next_in = (unsigned char*)pInput;
decompressStream.avail_in = inputLen;
char *pOutChar = (char*)pOut;
while ( decompressStream.avail_in )
{
decompressStream.total_out = 0;
decompressStream.next_out = (unsigned char*)pOutChar;
decompressStream.avail_out = outLen - (pOutChar - (char*)pOut);
int ret = inflate( &decompressStream, Z_NO_FLUSH );
if ( ret != Z_OK && ret != Z_STREAM_END )
return false;
pOutChar += decompressStream.total_out;
if ( ret == Z_STREAM_END )
{
if ( (pOutChar - (char*)pOut) == outLen )
{
return true;
}
else
{
Assert( false );
return false;
}
}
}
Assert( false ); // Should have gotten to Z_STREAM_END.
return false;
}
// ------------------------------------------------------------------------------------------------------------------------ //
// CWorkerMulticastListener implementation.
// ------------------------------------------------------------------------------------------------------------------------ //
class CWorkerMulticastListener
{
public:
CWorkerMulticastListener()
{
m_nUnfinishedFiles = 0;
}
~CWorkerMulticastListener()
{
Term();
}
bool Init( const CIPAddr &mcAddr )
{
m_MulticastAddr = mcAddr;
m_hMainThread = GetCurrentThread();
return true;
}
void Term()
{
m_WorkerFiles.PurgeAndDeleteElements();
}
CWorkerFile* RequestFileFromServer( const char *pFilename, const char *pPathID )
{
Assert( pPathID );
Assert( FindWorkerFile( pFilename, pPathID ) == NULL );
// Send a request to the master to find out if this file even exists.
CCriticalSectionLock csLock( &g_FileResponsesCS );
csLock.Lock();
int requestID = g_RequestID++;
csLock.Unlock();
unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_REQUEST };
const void *pChunks[4] = { packetID, &requestID, (void*)pFilename, pPathID };
int chunkLengths[4] = { sizeof( packetID ), sizeof( requestID ), strlen( pFilename ) + 1, strlen( pPathID ) + 1 };
VMPI_SendChunks( pChunks, chunkLengths, ARRAYSIZE( pChunks ), 0 );
// Wait for the file ID to come back.
CFileResponse response;
response.m_Response = -1;
response.m_bZeroLength = true;
// We're in a worker thread.. the main thread should be dispatching all the messages, so let it
// do that until we get our response.
while ( 1 )
{
bool bGotIt = false;
csLock.Lock();
for ( int iResponse=0; iResponse < g_FileResponses.Count(); iResponse++ )
{
if ( g_FileResponses[iResponse].m_RequestID == requestID )
{
response = g_FileResponses[iResponse];
g_FileResponses.Remove( iResponse );
bGotIt = true;
break;
}
}
csLock.Unlock();
if ( bGotIt )
break;
if ( GetCurrentThread() == m_hMainThread )
VMPI_DispatchNextMessage( 20 );
else
Sleep( 20 );
}
// If we get -1 back, it means the file doesn't exist.
int fileID = response.m_Response;
if ( fileID == -1 )
return NULL;
CWorkerFile *pTestFile = new CWorkerFile;
pTestFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
strcpy( pTestFile->m_Filename.Base(), pFilename );
pTestFile->m_PathID.SetSize( strlen( pPathID ) + 1 );
strcpy( pTestFile->m_PathID.Base(), pPathID );
pTestFile->m_FileID = fileID;
pTestFile->m_nChunksToReceive = 9999;
pTestFile->m_Timer.Start();
m_WorkerFiles.AddToTail( pTestFile );
pTestFile->m_bGotCompressedSize = false;
pTestFile->m_bZeroLength = response.m_bZeroLength;
++m_nUnfinishedFiles;
return pTestFile;
}
void FlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime )
{
if ( nChunksToAck )
{
// Tell the master we received this chunk.
unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_CHUNK_RECEIVED };
void *pChunks[2] = { packetID, chunksToAck };
int chunkLengths[2] = { sizeof( packetID ), nChunksToAck * 4 };
VMPI_SendChunks( pChunks, chunkLengths, 2, 0 );
nChunksToAck = 0;
}
lastAckTime = GetTickCount();
}
void MaybeFlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime )
{
if ( nChunksToAck && GetTickCount() - lastAckTime > ACK_FLUSH_INTERVAL )
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
}
void AddAckChunk(
unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2],
int &nChunksToAck,
DWORD &lastAckTime,
int fileID,
int iChunk )
{
chunksToAck[nChunksToAck][0] = (unsigned short)fileID;
chunksToAck[nChunksToAck][1] = (unsigned short)iChunk;
// TCP filesystem acks all chunks immediately so it'll send more.
++nChunksToAck;
if ( nChunksToAck == NUM_BUFFERED_CHUNK_ACKS || VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
{
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
}
}
// Returns the length of the packet's data or -1 if there is nothing.
int CheckFileChunkPackets( char *data, int dataSize )
{
// Using TCP.. pop the next received packet off the stack.
CCriticalSectionLock csLock( &g_FileResponsesCS );
csLock.Lock();
if ( g_FileChunkPackets.Count() <= 0 )
return -1;
CFileChunkPacket *pPacket = g_FileChunkPackets[ g_FileChunkPackets.Head() ];
g_FileChunkPackets.Remove( g_FileChunkPackets.Head() );
// Yes, this is inefficient, but the amount of data we're handling here is tiny so the
// effect is negligible.
int len;
if ( pPacket->m_Len > dataSize )
{
len = -1;
Warning( "CWorkerMulticastListener::ListenFor: Got a section of data too long (%d bytes).", pPacket->m_Len );
}
else
{
memcpy( data, pPacket->m_Data, pPacket->m_Len );
len = pPacket->m_Len;
}
free( pPacket );
return len;
}
void ShowSDKWorkerMsg( const char *pMsg, ... )
{
if ( !g_bMPIMaster && VMPI_IsSDKMode() )
{
va_list marker;
va_start( marker, pMsg );
char str[4096];
V_vsnprintf( str, sizeof( str ), pMsg, marker );
va_end( marker );
Msg( "%s", str );
}
}
// This is the main function the workers use to pick files out of the multicast stream.
// The app is waiting for a specific file, but we receive and ack any files we can until
// we get the file they're looking for, then we return.
//
// NOTE: ideally, this would be in a thread, but it adds lots of complications and may
// not be worth it.
CWorkerFile* ListenFor( const char *pFilename, const char *pPathID )
{
CWorkerFile *pFile = FindWorkerFile( pFilename, pPathID );
if ( !pFile )
{
// Ok, we haven't requested this file yet. Create an entry for it and
// tell the master we'd like this file.
pFile = RequestFileFromServer( pFilename, pPathID );
if ( !pFile )
return NULL;
// If it's zero-length, we can return right now.
if ( pFile->m_bZeroLength )
{
--m_nUnfinishedFiles;
return pFile;
}
}
// Setup a filename to print some debug spew with.
char printableFilename[58];
if ( V_strlen( pFilename ) > ARRAYSIZE( printableFilename ) - 1 )
{
V_strncpy( printableFilename, "[...]", sizeof( printableFilename ) );
V_strncat( printableFilename, &pFilename[V_strlen(pFilename) - ARRAYSIZE(printableFilename) + 1 + V_strlen(printableFilename)], sizeof( printableFilename ) );
}
else
{
V_strncpy( printableFilename, pFilename, sizeof( printableFilename ) );
}
ShowSDKWorkerMsg( "\rRecv %s (0%%) ", printableFilename );
int iChunkPayloadSize = VMPI_GetChunkPayloadSize();
// Now start listening to the stream.
// Note: no need to setup anything when in TCP mode - we just use the regular
// VMPI dispatch stuff to handle that.
ISocket *pSocket = NULL;
if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST )
{
pSocket = CreateMulticastListenSocket( m_MulticastAddr );
if ( !pSocket )
{
char str[512];
IP_GetLastErrorString( str, sizeof( str ) );
Warning( "CreateMulticastListenSocket (%d.%d.%d.%d:%d) failed\n%s\n", EXPAND_ADDR( m_MulticastAddr ), str );
return NULL;
}
}
else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
{
pSocket = CreateIPSocket();
if ( !pSocket->BindToAny( m_MulticastAddr.port ) )
{
pSocket->Release();
pSocket = NULL;
}
}
unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2];
int nChunksToAck = 0;
DWORD lastAckTime = GetTickCount();
// Now just receive multicast data until this file has been received.
while ( m_nUnfinishedFiles > 0 )
{
char data[MAX_CHUNK_PAYLOAD_SIZE+1024];
int len = -1;
if ( pSocket )
{
CIPAddr ipFrom;
len = pSocket->RecvFrom( data, sizeof( data ), &ipFrom );
}
else
{
len = CheckFileChunkPackets( data, sizeof( data ) );
}
if ( len == -1 )
{
// Sleep for 10ms and also handle socket errors.
Sleep( 0 );
VMPI_DispatchNextMessage( 10 );
continue;
}
g_nMulticastBytesReceived += len;
// Alrighty. Figure out what the deal is with this file.
CMulticastFileInfo *pInfo = (CMulticastFileInfo*)data;
int *piChunk = (int*)( pInfo + 1 );
const char *pTestFilename = (const char*)( piChunk + 1 );
const char *pPayload = pTestFilename + strlen( pFilename ) + 1;
int payloadLen = len - ( pPayload - data );
if ( payloadLen < 0 )
{
Warning( "CWorkerMulticastListener::ListenFor: invalid packet received on multicast group\n" );
continue;
}
if ( pInfo->m_FileID != pFile->m_FileID )
continue;
CWorkerFile *pTestFile = FindWorkerFile( pInfo->m_FileID );
if ( !pTestFile )
Error( "FindWorkerFile( %s ) failed\n", pTestFilename );
// TODO: reenable this code and disable the if right above here.
// We always get "invalid payload length" errors on the workers when using this, but
// I haven't been able to figure out why yet.
/*
// Put the data into whatever file it belongs in.
if ( !pTestFile )
{
pTestFile = RequestFileFromServer( pTestFilename );
if ( !pTestFile )
continue;
}
*/
// Is this the first packet about this file?
if ( !pTestFile->m_bGotCompressedSize )
{
pTestFile->m_bGotCompressedSize = true;
pTestFile->m_CompressedData.SetSize( pInfo->m_CompressedSize );
pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize );
pTestFile->m_ChunksReceived.SetSize( PAD_NUMBER( pInfo->m_nChunks, 8 ) / 8 );
pTestFile->m_nChunksToReceive = pInfo->m_nChunks;
memset( pTestFile->m_ChunksReceived.Base(), 0, pTestFile->m_ChunksReceived.Count() );
}
// Validate the chunk index and uncompressed size.
int iChunk = *piChunk;
if ( iChunk < 0 || iChunk >= pInfo->m_nChunks )
{
Error( "ListenFor(): invalid chunk index (%d) for file '%s'\n", iChunk, pTestFilename );
}
// Only handle this if we didn't already received the chunk.
if ( !(pTestFile->m_ChunksReceived[iChunk >> 3] & (1 << (iChunk & 7))) )
{
// Make sure the file is properly setup to receive the data into.
if ( (int)pInfo->m_UncompressedSize != pTestFile->m_UncompressedData.Count() ||
(int)pInfo->m_CompressedSize != pTestFile->m_CompressedData.Count() )
{
Error( "ListenFor(): invalid compressed or uncompressed size.\n"
"pInfo = '%s', pTestFile = '%s'\n"
"Compressed (pInfo = %d, pTestFile = %d)\n"
"Uncompressed (pInfo = %d, pTestFile = %d)\n",
pTestFilename,
pTestFile->GetFilename(),
pInfo->m_CompressedSize,
pTestFile->m_CompressedData.Count(),
pInfo->m_UncompressedSize,
pTestFile->m_UncompressedData.Count()
);
}
int iChunkStart = iChunk * iChunkPayloadSize;
int iChunkEnd = min( iChunkStart + iChunkPayloadSize, pTestFile->m_CompressedData.Count() );
int chunkLen = iChunkEnd - iChunkStart;
if ( chunkLen != payloadLen )
{
Error( "ListenFor(): invalid payload length for '%s' (%d should be %d)\n"
"pInfo = '%s', pTestFile = '%s'\n"
"Chunk %d out of %d. Compressed size: %d\n",
pTestFile->GetFilename(),
payloadLen,
chunkLen,
pTestFilename,
pTestFile->GetFilename(),
iChunk,
pInfo->m_nChunks,
pInfo->m_CompressedSize
);
}
memcpy( &pTestFile->m_CompressedData[iChunkStart], pPayload, chunkLen );
pTestFile->m_ChunksReceived[iChunk >> 3] |= (1 << (iChunk & 7));
--pTestFile->m_nChunksToReceive;
if ( pTestFile == pFile )
{
int percent = 100 - (100 * pFile->m_nChunksToReceive) / pInfo->m_nChunks;
ShowSDKWorkerMsg( "\rRecv %s (%d%%) [chunk %d/%d] ", printableFilename, percent, pInfo->m_nChunks - pFile->m_nChunksToReceive, pInfo->m_nChunks );
}
// Remember to ack what we received.
AddAckChunk( chunksToAck, nChunksToAck, lastAckTime, pInfo->m_FileID, iChunk );
// If we're done receiving the data, unpack it.
if ( pTestFile->m_nChunksToReceive == 0 )
{
// Ack the file.
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
pTestFile->m_Timer.End();
pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize );
--m_nUnfinishedFiles;
if ( !ZLibDecompress(
pTestFile->m_CompressedData.Base(),
pTestFile->m_CompressedData.Count(),
pTestFile->m_UncompressedData.Base(),
pTestFile->m_UncompressedData.Count() ) )
{
if ( pSocket )
pSocket->Release();
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
Error( "ZLibDecompress failed.\n" );
return NULL;
}
char str[512];
V_snprintf( str, sizeof( str ), "Got %s (%dk) in %.2fs",
printableFilename,
(pTestFile->m_UncompressedData.Count() + 511) / 1024,
pTestFile->m_Timer.GetDuration().GetSeconds()
);
Msg( "\r%-79s\n", str );
// Won't be needing this anymore.
pTestFile->m_CompressedData.Purge();
}
}
MaybeFlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
}
Assert( pFile->IsReadyToRead() );
FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
if ( pSocket )
pSocket->Release();
return pFile;
}
CWorkerFile* FindWorkerFile( const char *pFilename, const char *pPathID )
{
FOR_EACH_LL( m_WorkerFiles, i )
{
CWorkerFile *pWorkerFile = m_WorkerFiles[i];
if ( stricmp( pWorkerFile->GetFilename(), pFilename ) == 0 && stricmp( pWorkerFile->GetPathID(), pPathID ) == 0 )
return pWorkerFile;
}
return NULL;
}
CWorkerFile* FindWorkerFile( int fileID )
{
FOR_EACH_LL( m_WorkerFiles, i )
{
if ( m_WorkerFiles[i]->m_FileID == fileID )
return m_WorkerFiles[i];
}
return NULL;
}
private:
CIPAddr m_MulticastAddr;
CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles;
HANDLE m_hMainThread;
// How many files do we have open that we haven't finished receiving from the server yet?
// We always keep waiting for data until this is zero.
int m_nUnfinishedFiles;
};
// ------------------------------------------------------------------------------------------------------------------------ //
// CWorkerVMPIFileSystem implementation.
// ------------------------------------------------------------------------------------------------------------------------ //
class CWorkerVMPIFileSystem : public CBaseVMPIFileSystem
{
public:
InitReturnVal_t Init();
virtual void Term();
virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID );
virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID );
virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength );
virtual long GetFileTime( const char *pFileName, const char *pathID );
virtual bool IsFileWritable( const char *pFileName, const char *pPathID );
virtual bool SetFileWritable( char const *pFileName, bool writable, const char *pPathID );
virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly );
virtual void UnloadModule( CSysModule *pModule );
private:
CWorkerMulticastListener m_Listener;
};
CBaseVMPIFileSystem* CreateWorkerVMPIFileSystem()
{
CWorkerVMPIFileSystem *pRet = new CWorkerVMPIFileSystem;
g_pBaseVMPIFileSystem = pRet;
if ( pRet->Init() )
{
return pRet;
}
else
{
delete pRet;
g_pBaseVMPIFileSystem = NULL;
return NULL;
}
}
InitReturnVal_t CWorkerVMPIFileSystem::Init()
{
// Get the multicast addr to listen on.
CIPAddr mcAddr;
RecvMulticastIP( &mcAddr );
return m_Listener.Init( mcAddr ) ? INIT_OK : INIT_FAILED;
}
void CWorkerVMPIFileSystem::Term()
{
m_Listener.Term();
}
FileHandle_t CWorkerVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pathID )
{
Assert( g_bUseMPI );
// When it finally asks the filesystem for a file, it'll pass NULL for pathID if it's "".
if ( !pathID )
pathID = "";
if ( g_bDisableFileAccess )
Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );
// Workers can't open anything for write access.
bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
if ( bWriteAccess )
return FILESYSTEM_INVALID_HANDLE;
// Do we have this file's data already?
CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename, pathID );
if ( !pFile || !pFile->IsReadyToRead() )
{
// Ok, start listening to the multicast stream until we get the file we want.
// NOTE: it might make sense here to have the client ask for a list of ALL the files that
// the master currently has and wait to receive all of them (so we don't come back a bunch
// of times and listen
// NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there
// and listens to the multicast stream. Any time the master opens a new file up, it assumes
// all the workers need the file, and it starts to send it on the multicast stream until
// the worker threads respond that they all have it.
//
// (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that
// the thread owns).
//
// This would simplify all the worries about a client missing half the stream and having to
// wait for another cycle through it.
pFile = m_Listener.ListenFor( pFilename, pathID );
if ( !pFile )
{
return FILESYSTEM_INVALID_HANDLE;
}
}
// Ok! Got the file. now setup a memory stream they can read out of it with.
CVMPIFile_Memory *pOut = new CVMPIFile_Memory;
pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), strchr( pOptions, 't' ) ? 't' : 'b' );
return (FileHandle_t)pOut;
}
void CWorkerVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength )
{
Error( "CreateVirtualFile not supported in VMPI worker filesystem." );
}
long CWorkerVMPIFileSystem::GetFileTime( const char *pFileName, const char *pathID )
{
Error( "GetFileTime not supported in VMPI worker filesystem." );
return 0;
}
bool CWorkerVMPIFileSystem::IsFileWritable( const char *pFileName, const char *pPathID )
{
Error( "GetFileTime not supported in VMPI worker filesystem." );
return false;
}
bool CWorkerVMPIFileSystem::SetFileWritable( char const *pFileName, bool writable, const char *pPathID )
{
Error( "GetFileTime not supported in VMPI worker filesystem." );
return false;
}
bool CWorkerVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID )
{
// Handle this packet.
int subPacketID = pBuf->data[1];
switch( subPacketID )
{
case VMPI_FSPACKETID_MULTICAST_ADDR:
{
char *pInPos = &pBuf->data[2];
g_MulticastIP = *((CIPAddr*)pInPos);
pInPos += sizeof( g_MulticastIP );
g_bReceivedMulticastIP = true;
}
return true;
case VMPI_FSPACKETID_FILE_RESPONSE:
{
CCriticalSectionLock csLock( &g_FileResponsesCS );
csLock.Lock();
CFileResponse res;
res.m_RequestID = *((int*)&pBuf->data[2]);
res.m_Response = *((int*)&pBuf->data[6]);
res.m_bZeroLength = *((bool*)&pBuf->data[10]);
g_FileResponses.AddToTail( res );
}
return true;
case VMPI_FSPACKETID_FILE_CHUNK:
{
int nDataBytes = pBuf->getLen() - 2;
CFileChunkPacket *pPacket = (CFileChunkPacket*)malloc( sizeof( CFileChunkPacket ) + nDataBytes - 1 );
memcpy( pPacket->m_Data, &pBuf->data[2], nDataBytes );
pPacket->m_Len = nDataBytes;
CCriticalSectionLock csLock( &g_FileResponsesCS );
csLock.Lock();
g_FileChunkPackets.AddToTail( pPacket );
}
return true;
default:
return false;
}
}
CSysModule* CWorkerVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly )
{
return Sys_LoadModule( pFileName );
}
void CWorkerVMPIFileSystem::UnloadModule( CSysModule *pModule )
{
Sys_UnloadModule( pModule );
}
|