1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
// Copyright Epic Games, Inc. All Rights Reserved.
#include <zenutil/chunkrequests.h>
#include <zencore/blake3.h>
#include <zencore/iobuffer.h>
#include <zencore/sharedbuffer.h>
#include <zencore/stream.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <gsl/gsl-lite.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
struct RequestHeader
{
enum
{
kMagic = 0xAAAA'77AC
};
uint32_t Magic;
uint32_t ChunkCount;
uint32_t Reserved1;
uint32_t Reserved2;
};
struct ResponseHeader
{
uint32_t Magic = 0xbada'b00f;
uint32_t ChunkCount;
uint32_t Reserved1 = 0;
uint32_t Reserved2 = 0;
};
struct ResponseChunkEntry
{
uint32_t CorrelationId;
uint32_t Flags = 0;
uint64_t ChunkSize;
};
} // namespace
IoBuffer
BuildChunkBatchRequest(const std::vector<RequestChunkEntry>& Entries)
{
RequestHeader RequestHdr;
RequestHdr.Magic = (uint32_t)RequestHeader::kMagic;
RequestHdr.ChunkCount = gsl::narrow<uint32_t>(Entries.size());
UniqueBuffer Buffer = UniqueBuffer::Alloc(sizeof(RequestHeader) + sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
MutableMemoryView WriteBuffer = Buffer.GetMutableView();
WriteBuffer = WriteBuffer.CopyFrom(MemoryView(&RequestHdr, sizeof(RequestHeader)));
WriteBuffer.CopyFrom(MemoryView(Entries.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount));
return Buffer.MoveToShared().AsIoBuffer();
}
std::optional<std::vector<RequestChunkEntry>>
ParseChunkBatchRequest(const IoBuffer& Payload)
{
if (Payload.Size() <= sizeof(RequestHeader))
{
return {};
}
BinaryReader Reader(Payload);
RequestHeader RequestHdr;
Reader.Read(&RequestHdr, sizeof RequestHdr);
if (RequestHdr.Magic != RequestHeader::kMagic)
{
return {};
}
std::vector<RequestChunkEntry> RequestedChunks;
RequestedChunks.resize(RequestHdr.ChunkCount);
Reader.Read(RequestedChunks.data(), sizeof(RequestChunkEntry) * RequestHdr.ChunkCount);
return RequestedChunks;
}
std::vector<IoBuffer>
BuildChunkBatchResponse(const std::vector<RequestChunkEntry>& Requests, std::span<IoBuffer> Chunks)
{
ZEN_ASSERT(Requests.size() == Chunks.size());
size_t ChunkCount = Requests.size();
std::vector<IoBuffer> OutBlobs;
OutBlobs.reserve(1 + ChunkCount);
OutBlobs.emplace_back(sizeof(ResponseHeader) + ChunkCount * sizeof(ResponseChunkEntry));
uint8_t* ResponsePtr = reinterpret_cast<uint8_t*>(OutBlobs[0].MutableData());
ResponseHeader ResponseHdr;
ResponseHdr.ChunkCount = gsl::narrow<uint32_t>(Requests.size());
memcpy(ResponsePtr, &ResponseHdr, sizeof(ResponseHdr));
ResponsePtr += sizeof(ResponseHdr);
for (uint32_t ChunkIndex = 0; ChunkIndex < ChunkCount; ++ChunkIndex)
{
const IoBuffer& FoundChunk(Chunks[ChunkIndex]);
ResponseChunkEntry ResponseChunk;
ResponseChunk.CorrelationId = Requests[ChunkIndex].CorrelationId;
if (FoundChunk)
{
ResponseChunk.ChunkSize = FoundChunk.Size();
}
else
{
ResponseChunk.ChunkSize = uint64_t(-1);
}
memcpy(ResponsePtr, &ResponseChunk, sizeof(ResponseChunk));
ResponsePtr += sizeof(ResponseChunk);
}
OutBlobs.insert(OutBlobs.end(), Chunks.begin(), Chunks.end());
auto It = std::remove_if(OutBlobs.begin() + 1, OutBlobs.end(), [](const IoBuffer& B) { return B.GetSize() == 0; });
OutBlobs.erase(It, OutBlobs.end());
return OutBlobs;
}
std::vector<IoBuffer>
ParseChunkBatchResponse(const IoBuffer& Buffer)
{
MemoryView View = Buffer.GetView();
const ResponseHeader* Header = (const ResponseHeader*)View.GetData();
if (Header->Magic != 0xbada'b00f)
{
return {};
}
View.MidInline(sizeof(ResponseHeader));
const ResponseChunkEntry* Entries = (const ResponseChunkEntry*)View.GetData();
View.MidInline(sizeof(ResponseChunkEntry) * Header->ChunkCount);
std::vector<IoBuffer> Result(Header->ChunkCount);
for (uint32_t Index = 0; Index < Header->ChunkCount; Index++)
{
const ResponseChunkEntry& Entry = Entries[Index];
if (Result.size() < Entry.CorrelationId + 1)
{
Result.resize(Entry.CorrelationId + 1);
}
if (Entry.ChunkSize != uint64_t(-1))
{
Result[Entry.CorrelationId] = IoBuffer(IoBuffer::Wrap, View.GetData(), Entry.ChunkSize);
View.MidInline(Entry.ChunkSize);
}
}
return Result;
}
} // namespace zen
|