CARLA
streaming/detail/tcp/Client.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/BufferPool.h"
10 #include "carla/Debug.h"
11 #include "carla/Exception.h"
12 #include "carla/Logging.h"
13 #include "carla/Time.h"
14 
15 #include <boost/asio/connect.hpp>
16 #include <boost/asio/read.hpp>
17 #include <boost/asio/write.hpp>
18 #include <boost/asio/post.hpp>
19 #include <boost/asio/bind_executor.hpp>
20 
21 #include <exception>
22 
23 namespace carla {
24 namespace streaming {
25 namespace detail {
26 namespace tcp {
27 
28  // ===========================================================================
29  // -- IncomingMessage --------------------------------------------------------
30  // ===========================================================================
31 
32  /// Helper for reading incoming TCP messages. Allocates the whole message in
33  /// a single buffer.
35  public:
36 
37  explicit IncomingMessage(Buffer &&buffer) : _message(std::move(buffer)) {}
38 
39  boost::asio::mutable_buffer size_as_buffer() {
40  return boost::asio::buffer(&_size, sizeof(_size));
41  }
42 
43  boost::asio::mutable_buffer buffer() {
44  DEBUG_ASSERT(_size > 0u);
46  return _message.buffer();
47  }
48 
49  auto size() const {
50  return _size;
51  }
52 
53  auto pop() {
54  return std::move(_message);
55  }
56 
57  private:
58 
60 
62  };
63 
64  // ===========================================================================
65  // -- Client -----------------------------------------------------------------
66  // ===========================================================================
67 
69  boost::asio::io_context &io_context,
70  const token_type &token,
71  callback_function_type callback)
73  std::string("tcp client ") + std::to_string(token.get_stream_id())),
74  _token(token),
75  _callback(std::move(callback)),
76  _socket(io_context),
77  _strand(io_context),
78  _connection_timer(io_context),
79  _buffer_pool(std::make_shared<BufferPool>()) {
80  if (!_token.protocol_is_tcp()) {
81  throw_exception(std::invalid_argument("invalid token, only TCP tokens supported"));
82  }
83  }
84 
85  Client::~Client() = default;
86 
87  void Client::Connect() {
88  auto self = shared_from_this();
89  boost::asio::post(_strand, [this, self]() {
90  if (_done) {
91  return;
92  }
93 
94  using boost::system::error_code;
95 
96  if (_socket.is_open()) {
97  _socket.close();
98  }
99 
102  const auto ep = _token.to_tcp_endpoint();
103 
104  auto handle_connect = [this, self, ep](error_code ec) {
105  if (!ec) {
106  if (_done) {
107  return;
108  }
109  log_debug("streaming client: connected to", ep);
110  // Send the stream id to subscribe to the stream.
111  const auto &stream_id = _token.get_stream_id();
112  log_debug("streaming client: sending stream id", stream_id);
113  boost::asio::async_write(
114  _socket,
115  boost::asio::buffer(&stream_id, sizeof(stream_id)),
116  boost::asio::bind_executor(_strand, [=](error_code ec, size_t DEBUG_ONLY(bytes)) {
117  // Ensures to stop the execution once the connection has been stopped.
118  if (_done) {
119  return;
120  }
121  if (!ec) {
122  DEBUG_ASSERT_EQ(bytes, sizeof(stream_id));
123  // If succeeded start reading data.
124  ReadData();
125  } else {
126  // Else try again.
127  log_info("streaming client: failed to send stream id:", ec.message());
128  Connect();
129  }
130  }));
131  } else {
132  log_info("streaming client: connection failed:", ec.message());
133  Reconnect();
134  }
135  };
136 
137  log_debug("streaming client: connecting to", ep);
138  _socket.async_connect(ep, boost::asio::bind_executor(_strand, handle_connect));
139  });
140  }
141 
142  void Client::Stop() {
143  _connection_timer.cancel();
144  auto self = shared_from_this();
145  boost::asio::post(_strand, [this, self]() {
146  _done = true;
147  if (_socket.is_open()) {
148  _socket.close();
149  }
150  });
151  }
152 
154  auto self = shared_from_this();
155  _connection_timer.expires_from_now(time_duration::seconds(1u));
156  _connection_timer.async_wait([this, self](boost::system::error_code ec) {
157  if (!ec) {
158  Connect();
159  }
160  });
161  }
162 
164  auto self = shared_from_this();
165  boost::asio::post(_strand, [this, self]() {
166  if (_done) {
167  return;
168  }
169 
170  // log_debug("streaming client: Client::ReadData");
171 
172  auto message = std::make_shared<IncomingMessage>(_buffer_pool->Pop());
173 
174  auto handle_read_data = [this, self, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
175  DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_data", bytes, "bytes"));
176  if (!ec) {
177  DEBUG_ASSERT_EQ(bytes, message->size());
178  DEBUG_ASSERT_NE(bytes, 0u);
179  // Move the buffer to the callback function and start reading the next
180  // piece of data.
181  // log_debug("streaming client: success reading data, calling the callback");
182  boost::asio::post(_strand, [self, message]() { self->_callback(message->pop()); });
183  ReadData();
184  } else {
185  // As usual, if anything fails start over from the very top.
186  log_info("streaming client: failed to read data:", ec.message());
187  Connect();
188  }
189  };
190 
191  auto handle_read_header = [this, self, message, handle_read_data](
192  boost::system::error_code ec,
193  size_t DEBUG_ONLY(bytes)) {
194  DEBUG_ONLY(log_debug("streaming client: Client::ReadData.handle_read_header", bytes, "bytes"));
195  if (!ec && (message->size() > 0u)) {
196  DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type));
197  if (_done) {
198  return;
199  }
200  // Now that we know the size of the coming buffer, we can allocate our
201  // buffer and start putting data into it.
202  boost::asio::async_read(
203  _socket,
204  message->buffer(),
205  boost::asio::bind_executor(_strand, handle_read_data));
206  } else {
207  log_info("streaming client: failed to read header:", ec.message());
208  DEBUG_ONLY(log_debug("size = ", message->size()));
209  DEBUG_ONLY(log_debug("bytes = ", bytes));
210  Connect();
211  }
212  };
213 
214  // Read the size of the buffer that is coming.
215  boost::asio::async_read(
216  _socket,
217  message->size_as_buffer(),
218  boost::asio::bind_executor(_strand, handle_read_header));
219  });
220  }
221 
222 } // namespace tcp
223 } // namespace detail
224 } // namespace streaming
225 } // namespace carla
boost::asio::ip::tcp::endpoint to_tcp_endpoint() const
Definition: detail/Token.h:158
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition: Debug.h:77
void throw_exception(const std::exception &e)
Definition: Carla.cpp:101
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:99
static time_duration seconds(size_t timeout)
Definition: Time.h:22
Helper for reading incoming TCP messages.
static void log_debug(Args &&...)
Definition: Logging.h:75
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
uint32_t message_size_type
Definition: Types.h:20
void reset(size_type size)
Reset the size of this buffer.
Definition: carla/Buffer.h:249
Serializes a stream endpoint.
Definition: detail/Token.h:61
boost::asio::const_buffer buffer() const noexcept
Make a boost::asio::buffer from this buffer.
Definition: carla/Buffer.h:173
Client(boost::asio::io_context &io_context, const token_type &token, callback_function_type callback)
static void log_info(Args &&... args)
Definition: Logging.h:82
A pool of Buffer.
Definition: BufferPool.h:30
std::function< void(Buffer)> callback_function_type
const auto & get_stream_id() const
Definition: detail/Token.h:111
A piece of raw data.
Definition: carla/Buffer.h:39
#define DEBUG_ONLY(code)
Definition: Debug.h:55