#include #include #include #include "constants.h" #include //------------------------------------------------------------------------------ uint32 Packet::get_index() const { return index; } //------------------------------------------------------------------------------ uint32 Packet::get_thread_id() const { return thread_id & ~PACKET_FLAG_COMPRESSED; } //------------------------------------------------------------------------------ uint32 Packet::is_compressed() const { return !!(thread_id & PACKET_FLAG_COMPRESSED); } //------------------------------------------------------------------------------ Buffer& Packet::get_payload() { return payload; } //------------------------------------------------------------------------------ void Packet::decompress() { if (!is_compressed()) return; thread_id &= ~PACKET_FLAG_COMPRESSED; if (int32 ret_code = decompress(payload); ret_code != 0) throw Exception::StreamError("Lz4 decompress failure", position, ret_code); } //------------------------------------------------------------------------------ int32 Packet::decompress(Buffer& payload) { struct Lz4Block { uint16 decoded_size; char data[]; }; const auto& lz4_block = *(Lz4Block*)(payload.get_pointer()); uint32 encoded_size = payload.get_size() - sizeof(uint16); Allocator& allocator = Allocator::get_from(payload); MutableBuffer buffer = allocator.create_buffer(lz4_block.decoded_size); int lz4_ret = LZ4_decompress_safe( lz4_block.data, (char*)(buffer.get_pointer()), encoded_size, lz4_block.decoded_size ); if (lz4_ret != lz4_block.decoded_size) return lz4_ret; payload = std::move(buffer); return 0; } //------------------------------------------------------------------------------ Bundle Transport::read_packets(const Bundle& bundle) { Bundle ret; try { ret = _read_packets(bundle); } catch (const DataStream::Eof&) {} return ret; } //------------------------------------------------------------------------------ Transport::Result Transport::read_packets(Packets packets, const Buffer& buffer) { BufferStream stream = buffer.create_stream(); uint32 count = 0; uint32 position = 0; uint32 available = stream.get_remaining(); switch (_state) { while (available >= 4) { case State::HEADER: _size = stream.read(); _thread_id = stream.read(); if (_size < 4) throw Exception::StreamError("Unexpected size", position, _size); available -= 4; if (available < _size) { _state = State::PAYLOAD; return { count, position }; } case State::PAYLOAD: Buffer payload; uint32 payload_size = _size - sizeof(uint16) - sizeof(uint16); payload = stream.read_buf(payload_size); Packet& packet = packets[count]; packet.thread_id = _thread_id; packet.position = position; packet.index = _packet_count; packet.payload = std::move(payload); ++_packet_count; position += _size; ++count; if (count == packets.size()) break; available -= _size; } } _state = State::HEADER; return { count, position }; } //------------------------------------------------------------------------------ Transport::Transport(DataStream&& stream) : _stream(std::move(stream)) { } //------------------------------------------------------------------------------ Bundle Transport::_read_packets(const Bundle& bundle) { int32 count = 0; for (int32 available = _stream.get_available();;) { uint32 size = _stream.read(); if (size < 4) throw Exception::StreamError("Unexpected size", _stream.tell() - 2, size); uint32 thread_id = _stream.read(); uint64 position = _stream.tell(); Buffer payload; uint32 payload_size = size - sizeof(uint16) - sizeof(uint16); payload = _stream.read(payload_size); Packet& packet = bundle[count]; packet.thread_id = thread_id; packet.position = uint32(position); packet.index = _packet_count; packet.payload = std::move(payload); ++_packet_count; ++count; available -= size; if (available <= 4) break; if (count == bundle.size()) break; } for (Packet& packet : bundle.subspan(count)) packet = Packet(); auto ret = bundle.subspan(0, count); return ret; }