CARLA
secondary.cpp
Go to the documentation of this file.
1 // Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
9 
10 #include "carla/BufferPool.h"
11 #include "carla/Debug.h"
12 #include "carla/Exception.h"
13 #include "carla/Logging.h"
14 #include "carla/Time.h"
15 
16 #include <boost/asio/connect.hpp>
17 #include <boost/asio/read.hpp>
18 #include <boost/asio/write.hpp>
19 #include <boost/asio/post.hpp>
20 #include <boost/asio/bind_executor.hpp>
21 
22 #include <exception>
23 
24 namespace carla {
25 namespace multigpu {
26 
28  boost::asio::ip::tcp::endpoint ep,
30  _pool(),
31  _socket(_pool.io_context()),
32  _endpoint(ep),
33  _strand(_pool.io_context()),
34  _connection_timer(_pool.io_context()),
35  _buffer_pool(std::make_shared<BufferPool>()) {
36 
37  _commander.set_callback(callback);
38  }
39 
40 
42  std::string ip,
43  uint16_t port,
45  _pool(),
46  _socket(_pool.io_context()),
47  _strand(_pool.io_context()),
48  _connection_timer(_pool.io_context()),
49  _buffer_pool(std::make_shared<BufferPool>()) {
50 
51  boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip);
52  _endpoint = boost::asio::ip::tcp::endpoint(ip_address, port);
53  _commander.set_callback(callback);
54  }
55 
57  _pool.Stop();
58  }
59 
61  AsyncRun(2u);
62 
63  _commander.set_secondary(shared_from_this());
64 
65  std::weak_ptr<Secondary> weak = shared_from_this();
66  boost::asio::post(_strand, [weak]() {
67  auto self = weak.lock();
68  if (!self) return;
69 
70  if (self->_done) {
71  return;
72  }
73 
74  if (self->_socket.is_open()) {
75  self->_socket.close();
76  }
77 
78  auto handle_connect = [weak](boost::system::error_code ec) {
79  auto self = weak.lock();
80  if (!self) return;
81  if (ec) {
82  log_error("secondary server: connection failed:", ec.message());
83  if (!self->_done)
84  self->Reconnect();
85  return;
86  }
87 
88  if (self->_done) {
89  return;
90  }
91 
92  // This forces not using Nagle's algorithm.
93  // Improves the sync mode velocity on Linux by a factor of ~3.
94  self->_socket.set_option(boost::asio::ip::tcp::no_delay(true));
95 
96  log_info("secondary server: connected to ", self->_endpoint);
97 
98  self->ReadData();
99  };
100 
101  self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect));
102  });
103  }
104 
106  _connection_timer.cancel();
107  std::weak_ptr<Secondary> weak = shared_from_this();
108  boost::asio::post(_strand, [weak]() {
109  auto self = weak.lock();
110  if (!self) return;
111  self->_done = true;
112  if (self->_socket.is_open()) {
113  self->_socket.close();
114  }
115  });
116  }
117 
119  std::weak_ptr<Secondary> weak = shared_from_this();
120  _connection_timer.expires_from_now(time_duration::seconds(1u));
121  _connection_timer.async_wait([weak](boost::system::error_code ec) {
122  auto self = weak.lock();
123  if (!self) return;
124  if (!ec) {
125  self->Connect();
126  }
127  });
128  }
129 
130  void Secondary::AsyncRun(size_t worker_threads) {
131  _pool.AsyncRun(worker_threads);
132  }
133 
134  void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
135  DEBUG_ASSERT(message != nullptr);
136  DEBUG_ASSERT(!message->empty());
137  std::weak_ptr<Secondary> weak = shared_from_this();
138  boost::asio::post(_strand, [=]() {
139  auto self = weak.lock();
140  if (!self) return;
141  if (!self->_socket.is_open()) {
142  return;
143  }
144 
145  auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
146  auto self = weak.lock();
147  if (!self) return;
148  if (ec) {
149  log_error("error sending data: ", ec.message());
150  }
151  };
152 
153  // _deadline.expires_from_now(_timeout);
154  boost::asio::async_write(
155  self->_socket,
156  message->GetBufferSequence(),
157  boost::asio::bind_executor(self->_strand, handle_sent));
158  });
159  }
160 
161  void Secondary::Write(Buffer buffer) {
162  auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
163  auto message = Secondary::MakeMessage(view_data);
164 
165  DEBUG_ASSERT(message != nullptr);
166  DEBUG_ASSERT(!message->empty());
167  std::weak_ptr<Secondary> weak = shared_from_this();
168  boost::asio::post(_strand, [=]() {
169  auto self = weak.lock();
170  if (!self) return;
171  if (!self->_socket.is_open()) {
172  return;
173  }
174 
175  auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
176  auto self = weak.lock();
177  if (!self) return;
178  if (ec) {
179  log_error("error sending data: ", ec.message());
180  }
181  };
182 
183  // _deadline.expires_from_now(_timeout);
184  boost::asio::async_write(
185  self->_socket,
186  message->GetBufferSequence(),
187  boost::asio::bind_executor(self->_strand, handle_sent));
188  });
189  }
190 
191  void Secondary::Write(std::string text) {
192  std::weak_ptr<Secondary> weak = shared_from_this();
193  boost::asio::post(_strand, [=]() {
194  auto self = weak.lock();
195  if (!self) return;
196  if (!self->_socket.is_open()) {
197  return;
198  }
199 
200  auto handle_sent = [weak](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
201  auto self = weak.lock();
202  if (!self) return;
203  if (ec) {
204  log_error("error sending data: ", ec.message());
205  }
206  };
207 
208  // _deadline.expires_from_now(_timeout);
209  // sent first size buffer
210  int this_size = text.size();
211  boost::asio::async_write(
212  self->_socket,
213  boost::asio::buffer(&this_size, sizeof(this_size)),
214  boost::asio::bind_executor(self->_strand, handle_sent));
215 
216  // send characters
217  boost::asio::async_write(
218  self->_socket,
219  boost::asio::buffer(text.c_str(), text.size()),
220  boost::asio::bind_executor(self->_strand, handle_sent));
221  });
222  }
223 
225  std::weak_ptr<Secondary> weak = shared_from_this();
226  boost::asio::post(_strand, [weak]() {
227  auto self = weak.lock();
228  if (!self) return;
229  if (self->_done) {
230  return;
231  }
232 
233  auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
234 
235  auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
236  auto self = weak.lock();
237  if (!self) return;
238  if (!ec) {
239  DEBUG_ASSERT_EQ(bytes, message->size());
240  DEBUG_ASSERT_NE(bytes, 0u);
241  // Move the buffer to the callback function and start reading the next
242  // piece of data.
243  self->GetCommander().process_command(message->pop());
244  self->ReadData();
245  } else {
246  // As usual, if anything fails start over from the very top.
247  log_error("secondary server: failed to read data: ", ec.message());
248  // Connect();
249  }
250  };
251 
252  auto handle_read_header = [weak, message, handle_read_data](
253  boost::system::error_code ec,
254  size_t DEBUG_ONLY(bytes)) {
255  auto self = weak.lock();
256  if (!self) return;
257  if (!ec && (message->size() > 0u)) {
259  if (self->_done) {
260  return;
261  }
262  // Now that we know the size of the coming buffer, we can allocate our
263  // buffer and start putting data into it.
264  boost::asio::async_read(
265  self->_socket,
266  message->buffer(),
267  boost::asio::bind_executor(self->_strand, handle_read_data));
268  } else if (!self->_done) {
269  log_error("secondary server: failed to read header: ", ec.message());
270  // DEBUG_ONLY(printf("size = ", message->size()));
271  // DEBUG_ONLY(printf("bytes = ", bytes));
272  // Connect();
273  }
274  };
275 
276  // Read the size of the buffer that is coming.
277  boost::asio::async_read(
278  self->_socket,
279  message->size_as_buffer(),
280  boost::asio::bind_executor(self->_strand, handle_read_header));
281  });
282  }
283 
284 } // namespace multigpu
285 } // namespace carla
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition: Debug.h:77
boost::asio::io_context::strand _strand
Definition: secondary.h:78
SecondaryCommands _commander
Definition: secondary.h:82
static void log_error(Args &&... args)
Definition: Logging.h:110
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
boost::asio::ip::tcp::endpoint _endpoint
Definition: secondary.h:77
boost::asio::deadline_timer _connection_timer
Definition: secondary.h:79
static time_duration seconds(size_t timeout)
Definition: Time.h:22
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition: BufferView.h:56
std::function< void(MultiGPUCommand, carla::Buffer)> callback_type
void AsyncRun(size_t worker_threads)
Definition: secondary.cpp:130
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
uint32_t message_size_type
Definition: Types.h:20
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback)
Definition: secondary.cpp:27
static auto MakeMessage(Buffers... buffers)
Definition: secondary.h:62
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
Definition: secondary.cpp:134
void set_secondary(std::shared_ptr< Secondary > secondary)
static void log_info(Args &&... args)
Definition: Logging.h:82
A pool of Buffer.
Definition: BufferPool.h:30
void Stop()
Stop the ThreadPool and join all its threads.
Definition: ThreadPool.h:76
std::shared_ptr< BufferPool > _buffer_pool
Definition: secondary.h:80
A piece of raw data.
Definition: carla/Buffer.h:42
boost::asio::ip::tcp::socket _socket
Definition: secondary.h:76
void AsyncRun(size_t worker_threads)
Launch threads to run tasks asynchronously.
Definition: ThreadPool.h:51
void set_callback(callback_type callback)
#define DEBUG_ONLY(code)
Definition: Debug.h:55