15 #include <boost/asio/connect.hpp> 16 #include <boost/asio/read.hpp> 17 #include <boost/asio/write.hpp> 18 #include <boost/asio/post.hpp> 19 #include <boost/asio/bind_executor.hpp> 40 return boost::asio::buffer(&
_size,
sizeof(
_size));
43 boost::asio::mutable_buffer
buffer() {
69 boost::asio::io_context &io_context,
73 std::string(
"tcp client ") +
std::to_string(token.get_stream_id())),
75 _callback(
std::move(callback)),
78 _connection_timer(io_context),
81 throw_exception(std::invalid_argument(
"invalid token, only TCP tokens supported"));
88 auto self = shared_from_this();
89 boost::asio::post(
_strand, [
this,
self]() {
94 using boost::system::error_code;
104 auto handle_connect = [
this,
self, ep](error_code ec) {
111 _socket.set_option(boost::asio::ip::tcp::no_delay(
true));
112 log_debug(
"streaming client: connected to", ep);
116 boost::asio::async_write(
119 boost::asio::bind_executor(
_strand, [=](error_code ec,
size_t DEBUG_ONLY(bytes)) {
130 log_debug(
"streaming client: failed to send stream id:", ec.message());
135 log_info(
"streaming client: connection failed:", ec.message());
140 log_debug(
"streaming client: connecting to", ep);
141 _socket.async_connect(ep, boost::asio::bind_executor(
_strand, handle_connect));
147 auto self = shared_from_this();
148 boost::asio::post(
_strand, [
this,
self]() {
157 auto self = shared_from_this();
167 auto self = shared_from_this();
168 boost::asio::post(
_strand, [
this,
self]() {
175 auto message = std::make_shared<IncomingMessage>(
_buffer_pool->Pop());
177 auto handle_read_data = [
this,
self, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
178 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_data", bytes,
"bytes"));
185 boost::asio::post(
_strand, [
self, message]() {
self->_callback(message->pop()); });
189 log_debug(
"streaming client: failed to read data:", ec.message());
194 auto handle_read_header = [
this,
self, message, handle_read_data](
195 boost::system::error_code ec,
197 DEBUG_ONLY(
log_debug(
"streaming client: Client::ReadData.handle_read_header", bytes,
"bytes"));
198 if (!ec && (message->size() > 0u)) {
205 boost::asio::async_read(
208 boost::asio::bind_executor(
_strand, handle_read_data));
210 log_debug(
"streaming client: failed to read header:", ec.message());
218 boost::asio::async_read(
220 message->size_as_buffer(),
221 boost::asio::bind_executor(
_strand, handle_read_header));
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
#define DEBUG_ASSERT_NE(lhs, rhs)
std::shared_ptr< BufferPool > _buffer_pool
boost::asio::deadline_timer _connection_timer
void throw_exception(const std::exception &e)
This file contains definitions of common data structures used in traffic manager. ...
static time_duration seconds(size_t timeout)
boost::asio::mutable_buffer size_as_buffer()
Helper for reading incoming TCP messages.
static void log_debug(Args &&...)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT(predicate)
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
uint32_t message_size_type
void reset(size_type size)
Reset the size of this buffer.
bool protocol_is_tcp() const
boost::asio::io_context::strand _strand
boost::asio::mutable_buffer buffer()
boost::asio::ip::tcp::socket _socket
Serializes a stream endpoint.
boost::asio::const_buffer buffer() const noexcept
Make a boost::asio::buffer from this buffer.
Client(boost::asio::io_context &io_context, const token_type &token, callback_function_type callback)
carla::streaming::detail::stream_id_type stream_id
static void log_info(Args &&... args)
std::function< void(Buffer)> callback_function_type
const auto & get_stream_id() const
IncomingMessage(Buffer &&buffer)