CARLA
primary.cpp
Go to the documentation of this file.
1 // Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/Debug.h"
10 #include "carla/Logging.h"
13 
14 #include <boost/asio/read.hpp>
15 #include <boost/asio/write.hpp>
16 #include <boost/asio/bind_executor.hpp>
17 #include <boost/asio/post.hpp>
18 
19 #include <atomic>
20 #include <thread>
21 
22 namespace carla {
23 namespace multigpu {
24 
25  static std::atomic_size_t SESSION_COUNTER{0u};
26 
28  boost::asio::io_context &io_context,
29  const time_duration timeout,
30  Listener &server)
32  std::string("tcp multigpu server session ") + std::to_string(SESSION_COUNTER)),
33  _server(server),
34  _session_id(SESSION_COUNTER++),
35  _socket(io_context),
36  _timeout(timeout),
37  _deadline(io_context),
38  _strand(io_context),
39  _buffer_pool(std::make_shared<BufferPool>()) {}
40 
42  if (_socket.is_open()) {
43  boost::system::error_code ec;
44  _socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
45  _socket.close();
46  }
47  }
48 
53  DEBUG_ASSERT(on_opened && on_closed);
54 
55  // This forces not using Nagle's algorithm.
56  // Improves the sync mode velocity on Linux by a factor of ~3.
57  const boost::asio::ip::tcp::no_delay option(true);
58  _socket.set_option(option);
59 
60  // callbacks
61  _on_closed = std::move(on_closed);
62  _on_response = std::move(on_response);
63  on_opened(shared_from_this());
64 
65  ReadData();
66  }
67 
68  void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
69  DEBUG_ASSERT(message != nullptr);
70  DEBUG_ASSERT(!message->empty());
71  std::weak_ptr<Primary> weak = shared_from_this();
72  boost::asio::post(_strand, [=]() {
73  auto self = weak.lock();
74  if (!self) return;
75  if (!self->_socket.is_open()) {
76  return;
77  }
78 
79  auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
80  auto self = weak.lock();
81  if (!self) return;
82  if (ec) {
83  log_error("session ", self->_session_id, ": error sending data: ", ec.message());
84  self->CloseNow(ec);
85  } else {
86  // DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
87  }
88  };
89 
90  self->_deadline.expires_from_now(self->_timeout);
91  boost::asio::async_write(
92  self->_socket,
93  message->GetBufferSequence(),
94  boost::asio::bind_executor(self->_strand, handle_sent));
95  });
96  }
97 
98  void Primary::Write(std::string text) {
99  std::weak_ptr<Primary> weak = shared_from_this();
100  boost::asio::post(_strand, [=]() {
101  auto self = weak.lock();
102  if (!self) return;
103  if (!self->_socket.is_open()) {
104  return;
105  }
106 
107  // sent first size buffer
108  self->_deadline.expires_from_now(self->_timeout);
109  int this_size = text.size();
110  boost::asio::async_write(
111  self->_socket,
112  boost::asio::buffer(&this_size, sizeof(this_size)),
113  boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
114  // send characters
115  boost::asio::async_write(
116  self->_socket,
117  boost::asio::buffer(text.c_str(), text.size()),
118  boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
119  });
120  }
121 
123  std::weak_ptr<Primary> weak = shared_from_this();
124  boost::asio::post(_strand, [weak]() {
125  auto self = weak.lock();
126  if (!self) return;
127 
128  auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
129 
130  auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
131  auto self = weak.lock();
132  if (!self) return;
133  if (!ec) {
134  DEBUG_ASSERT_EQ(bytes, message->size());
135  DEBUG_ASSERT_NE(bytes, 0u);
136  // Move the buffer to the callback function and start reading the next
137  // piece of data.
138  self->_on_response(self, message->pop());
139  std::cout << "Getting data on listener\n";
140  self->ReadData();
141  } else {
142  // As usual, if anything fails start over from the very top.
143  log_error("primary server: failed to read data: ", ec.message());
144  }
145  };
146 
147  auto handle_read_header = [weak, message, handle_read_data](
148  boost::system::error_code ec,
149  size_t DEBUG_ONLY(bytes)) {
150  auto self = weak.lock();
151  if (!self) return;
152  if (!ec && (message->size() > 0u)) {
153  // Now that we know the size of the coming buffer, we can allocate our
154  // buffer and start putting data into it.
155  boost::asio::async_read(
156  self->_socket,
157  message->buffer(),
158  boost::asio::bind_executor(self->_strand, handle_read_data));
159  } else {
160  if (ec) {
161  log_error("Primary server: failed to read header: ", ec.message());
162  }
163  // Connect();
164  self->Close();
165  }
166  };
167 
168  // Read the size of the buffer that is coming.
169  boost::asio::async_read(
170  self->_socket,
171  message->size_as_buffer(),
172  boost::asio::bind_executor(self->_strand, handle_read_header));
173  });
174  }
175 
176  void Primary::Close() {
177  std::weak_ptr<Primary> weak = shared_from_this();
178  boost::asio::post(_strand, [weak]() {
179  auto self = weak.lock();
180  if (!self) return;
181  self->CloseNow();
182  });
183  }
184 
186  if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
187  log_debug("session ", _session_id, " time out");
188  Close();
189  } else {
190  std::weak_ptr<Primary> weak = shared_from_this();
191  _deadline.async_wait([weak](boost::system::error_code ec) {
192  auto self = weak.lock();
193  if (!self) return;
194  if (!ec) {
195  self->StartTimer();
196  } else {
197  log_error("session ", self->_session_id, " timed out error: ", ec.message());
198  }
199  });
200  }
201  }
202 
203  void Primary::CloseNow(boost::system::error_code ec) {
204  _deadline.cancel();
205  if (!ec)
206  {
207  if (_socket.is_open()) {
208  boost::system::error_code ec2;
209  _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
210  _socket.close();
211  }
212  }
213  _on_closed(shared_from_this());
214  log_debug("session", _session_id, "closed");
215  }
216 
217 } // namespace multigpu
218 } // namespace carla
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition: listener.h:33
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition: Debug.h:77
boost::asio::io_context::strand _strand
Definition: primary.h:97
const size_t _session_id
Definition: primary.h:89
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition: listener.h:32
static void log_error(Args &&... args)
Definition: Logging.h:110
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
void Close()
Post a job to close the session.
Definition: primary.cpp:176
static void log_debug(Args &&...)
Definition: Logging.h:75
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
void CloseNow(boost::system::error_code ec=boost::system::error_code())
Definition: primary.cpp:203
void ReadData()
read data
Definition: primary.cpp:122
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
Primary(boost::asio::io_context &io_context, time_duration timeout, Listener &server)
Definition: primary.cpp:27
boost::asio::deadline_timer _deadline
Definition: primary.h:95
socket_type _socket
Definition: primary.h:91
static std::atomic_size_t SESSION_COUNTER
Definition: primary.cpp:25
A pool of Buffer.
Definition: BufferPool.h:30
Listener::callback_function_type _on_closed
Definition: primary.h:99
Positive time duration up to milliseconds resolution.
Definition: Time.h:19
Listener::callback_function_type_response _on_response
Definition: primary.h:101
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
Writes some data to the socket.
Definition: primary.cpp:68
#define DEBUG_ONLY(code)
Definition: Debug.h:55
void Open(Listener::callback_function_type on_opened, Listener::callback_function_type on_closed, Listener::callback_function_type_response on_response)
Starts the session and calls on_opened after successfully reading the stream id, and on_closed once t...
Definition: primary.cpp:49