blob: ab0ca381a1748ea58c53fb10dc47e3c391d43e81 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include <zenhttp/websocket.h>
#include "httpsys_iocontext.h"
#include <zencore/thread.h>
#if ZEN_WITH_HTTPSYS
# define _WINSOCKAPI_
# include <zencore/windows.h>
# include <http.h>
# include <atomic>
# include <deque>
# include <vector>
namespace zen {
/**
* WebSocket connection over an http.sys opaque-mode connection
*
* After the 101 Switching Protocols response is sent with
* HTTP_SEND_RESPONSE_FLAG_OPAQUE, http.sys stops parsing HTTP on the
* connection. Raw bytes are exchanged via HttpReceiveRequestEntityBody /
* HttpSendResponseEntityBody using the original RequestId.
*
* All I/O is performed asynchronously via the same IOCP threadpool used
* for normal http.sys traffic, eliminating per-connection threads.
*
* Lifetime is managed through intrusive reference counting (RefCounted).
* A self-reference (m_SelfRef) is held from Start() until all outstanding
* async operations have drained, preventing premature destruction.
*/
class WsHttpSysConnection : public WebSocketConnection
{
public:
WsHttpSysConnection(HANDLE RequestQueueHandle, HTTP_REQUEST_ID RequestId, IWebSocketHandler& Handler, PTP_IO Iocp);
~WsHttpSysConnection() override;
/**
* Start the async read loop. Must be called once after construction
* and after the 101 response has been sent.
*/
void Start();
/**
* Shut down the connection. Cancels pending I/O; IOCP completions
* will fire with ERROR_OPERATION_ABORTED and drain naturally.
*/
void Shutdown();
// WebSocketConnection interface
void SendText(std::string_view Text) override;
void SendBinary(std::span<const uint8_t> Data) override;
void Close(uint16_t Code, std::string_view Reason) override;
bool IsOpen() const override;
// Called from IoCompletionCallback via tagged dispatch
void OnReadCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred);
void OnWriteCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred);
private:
void IssueAsyncRead();
void ProcessReceivedData();
void EnqueueWrite(std::vector<uint8_t> Frame);
void FlushWriteQueue();
void DoClose(uint16_t Code, std::string_view Reason);
void Disconnect();
void MaybeReleaseSelfRef();
HANDLE m_RequestQueueHandle;
HTTP_REQUEST_ID m_RequestId;
IWebSocketHandler& m_Handler;
PTP_IO m_Iocp;
// Tagged OVERLAPPED contexts for concurrent read and write
HttpSysIoContext m_ReadIoContext{};
HttpSysIoContext m_WriteIoContext{};
// Read state
std::vector<uint8_t> m_ReadBuffer;
std::vector<uint8_t> m_Accumulated;
// Write state
RwLock m_WriteLock;
std::deque<std::vector<uint8_t>> m_WriteQueue;
std::vector<uint8_t> m_CurrentWriteBuffer;
HTTP_DATA_CHUNK m_WriteChunk{};
bool m_IsWriting = false;
// Lifetime management
std::atomic<int32_t> m_OutstandingOps{0};
Ref<WsHttpSysConnection> m_SelfRef;
std::atomic<bool> m_ShutdownRequested{false};
std::atomic<bool> m_IsOpen{true};
bool m_CloseSent = false;
};
} // namespace zen
#endif // ZEN_WITH_HTTPSYS
|