CARLA
streaming/detail/tcp/Client.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/BufferPool.h"
10 #include "carla/Debug.h"
11 #include "carla/Exception.h"
12 #include "carla/Logging.h"
13 #include "carla/Time.h"
14 
15 #include <boost/asio/connect.hpp>
16 #include <boost/asio/read.hpp>
17 #include <boost/asio/write.hpp>
18 #include <boost/asio/post.hpp>
19 #include <boost/asio/bind_executor.hpp>
20 
21 #include <exception>
22 
23 namespace carla {
24 namespace streaming {
25 namespace detail {
26 namespace tcp {
27 
28  // ===========================================================================
29  // -- IncomingMessage --------------------------------------------------------
30  // ===========================================================================
31 
32  /// Helper for reading incoming TCP messages. Allocates the whole message in
33  /// a single buffer.
35  public:
36 
37  explicit IncomingMessage(Buffer &&buffer) : _message(std::move(buffer)) {}
38 
39  boost::asio::mutable_buffer size_as_buffer() {
40  return boost::asio::buffer(&_size, sizeof(_size));
41  }
42 
43  boost::asio::mutable_buffer buffer() {
44  DEBUG_ASSERT(_size > 0u);
46  return _message.buffer();
47  }
48 
49  auto size() const {
50  return _size;
51  }
52 
53  auto pop() {
54  return std::move(_message);
55  }
56 
57  private:
58 
60 
62  };
63 
64  // ===========================================================================
65  // -- Client -----------------------------------------------------------------
66  // ===========================================================================
67 
69  boost::asio::io_context &io_context,
70  const token_type &token,
71  callback_function_type callback)
73  std::string("tcp client ") + std::to_string(token.get_stream_id())),
74  _token(token),
75  _callback(std::move(callback)),
76  _socket(io_context),
77  _strand(io_context),
78  _connection_timer(io_context),
79  _buffer_pool(std::make_shared<BufferPool>()) {
80  if (!_token.protocol_is_tcp()) {
81  throw_exception(std::invalid_argument("invalid token, only TCP tokens supported"));
82  }
83  }
84 
85  Client::~Client() = default;
86 
87  void Client::Connect() {
88  auto self = shared_from_this();
89  boost::asio::post(_strand, [this, self]() {
90  if (_done) {
91  return;
92  }
93 
94  using boost::system::error_code;
95 
96  if (_socket.is_open()) {
97  _socket.close();
98  }
99 
102  const auto ep = _token.to_tcp_endpoint();
103 
104  auto handle_connect = [this, self, ep](error_code ec) {
105  if (!ec) {
106  if (_done) {
107  return;
108  }
109  // This forces not using Nagle's algorithm.
110  // Improves the sync mode velocity on Linux by a factor of ~3.
111  _socket.set_option(boost::asio::ip::tcp::no_delay(true));
112  log_debug("streaming client: connected to", ep);
113  // Send the stream id to subscribe to the stream.
114  const auto &stream_id = _token.get_stream_id();
115  log_debug("streaming client: sending stream id", stream_id);
116  boost::asio::async_write(
117  _socket,
118  boost::asio::buffer(&stream_id, sizeof(stream_id)),
119  boost::asio::bind_executor(_strand, [=](error_code ec, size_t DEBUG_ONLY(bytes)) {
120  // Ensures to stop the execution once the connection has been stopped.
121  if (_done) {
122  return;
123  }
124  if (!ec) {
125  DEBUG_ASSERT_EQ(bytes, sizeof(stream_id));
126  // If succeeded start reading data.
127  ReadData();
128  } else {
129  // Else try again.
130  log_debug("streaming client: failed to send stream id:", ec.message());
131  Connect();
132  }
133  }));
134  } else {
135  log_info("streaming client: connection failed:", ec.message());
136  Reconnect();
137  }
138  };
139 
140  log_debug("streaming client: connecting to", ep);
141  _socket.async_connect(ep, boost::asio::bind_executor(_strand, handle_connect));
142  });
143  }
144 
145  void Client::Stop() {
146  _connection_timer.cancel();
147  auto self = shared_from_this();
148  boost::asio::post(_strand, [this, self]() {
149  _done = true;
150  if (_socket.is_open()) {
151  _socket.close();
152  }
153  });
154  }
155 
157  auto self = shared_from_this();
158  _connection_timer.expires_from_now(time_duration::seconds(1u));
159  _connection_timer.async_wait([this, self](boost::system::error_code ec) {
160  if (!ec) {
161  Connect();
162  }
163  });
164  }
165 
167  auto self = shared_from_this();
168  boost::asio::post(_strand, [this, self]() {
169  if (_done) {
170  return;
171  }
172 
173  // log_debug("streaming client: Client::ReadData");
174 
175  auto message = std::make_shared<IncomingMessage>(_buffer_pool->Pop());
176 
177  auto handle_read_data = [this, self, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
178  DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_data", bytes, "bytes"));
179  if (!ec) {
180  DEBUG_ASSERT_EQ(bytes, message->size());
181  DEBUG_ASSERT_NE(bytes, 0u);
182  // Move the buffer to the callback function and start reading the next
183  // piece of data.
184  // log_debug("streaming client: success reading data, calling the callback");
185  boost::asio::post(_strand, [self, message]() { self->_callback(message->pop()); });
186  ReadData();
187  } else {
188  // As usual, if anything fails start over from the very top.
189  log_debug("streaming client: failed to read data:", ec.message());
190  Connect();
191  }
192  };
193 
194  auto handle_read_header = [this, self, message, handle_read_data](
195  boost::system::error_code ec,
196  size_t DEBUG_ONLY(bytes)) {
197  DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
198  if (!ec && (message->size() > 0u)) {
199  DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
200  if (_done) {
201  return;
202  }
203  // Now that we know the size of the coming buffer, we can allocate our
204  // buffer and start putting data into it.
205  boost::asio::async_read(
206  _socket,
207  message->buffer(),
208  boost::asio::bind_executor(_strand, handle_read_data));
209  } else if (!_done) {
210  log_debug("streaming client: failed to read header:", ec.message());
211  DEBUG_ONLY(log_debug("size = ", message->size()));
212  DEBUG_ONLY(log_debug("bytes = ", bytes));
213  Connect();
214  }
215  };
216 
217  // Read the size of the buffer that is coming.
218  boost::asio::async_read(
219  _socket,
220  message->size_as_buffer(),
221  boost::asio::bind_executor(_strand, handle_read_header));
222  });
223  }
224 
225 } // namespace tcp
226 } // namespace detail
227 } // namespace streaming
228 } // namespace carla
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
Definition: detail/Token.h:167
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition: Debug.h:77
void throw_exception(const std::exception &e)
Definition: Carla.cpp:135
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
static time_duration seconds(size_t timeout)
Definition: Time.h:22
Helper for reading incoming TCP messages.
static void log_debug(Args &&...)
Definition: Logging.h:75
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
uint32_t message_size_type
Definition: Types.h:20
void reset(size_type size)
Reset the size of this buffer.
Definition: carla/Buffer.h:252
Serializes a stream endpoint.
Definition: detail/Token.h:61
boost::asio::const_buffer buffer() const noexcept
Make a boost::asio::buffer from this buffer.
Definition: carla/Buffer.h:176
Client(boost::asio::io_context &io_context, const token_type &token, callback_function_type callback)
carla::streaming::detail::stream_id_type stream_id
static void log_info(Args &&... args)
Definition: Logging.h:82
A pool of Buffer.
Definition: BufferPool.h:30
std::function< void(Buffer)> callback_function_type
const auto & get_stream_id() const
Definition: detail/Token.h:116
A piece of raw data.
Definition: carla/Buffer.h:42
#define DEBUG_ONLY(code)
Definition: Debug.h:55