CARLA
ServerSession.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
9 
10 #include "carla/Debug.h"
11 #include "carla/Logging.h"
12 
13 #include <boost/asio/read.hpp>
14 #include <boost/asio/write.hpp>
15 #include <boost/asio/bind_executor.hpp>
16 #include <boost/asio/post.hpp>
17 
18 #include <atomic>
19 #include <thread>
20 
21 namespace carla {
22 namespace streaming {
23 namespace detail {
24 namespace tcp {
25 
26  static std::atomic_size_t SESSION_COUNTER{0u};
27 
29  boost::asio::io_context &io_context,
30  const time_duration timeout,
31  Server &server)
33  std::string("tcp server session ") + std::to_string(SESSION_COUNTER)),
34  _server(server),
35  _session_id(SESSION_COUNTER++),
36  _socket(io_context),
37  _timeout(timeout),
38  _deadline(io_context),
39  _strand(io_context) {}
40 
42  callback_function_type on_opened,
43  callback_function_type on_closed) {
44  DEBUG_ASSERT(on_opened && on_closed);
45  _on_closed = std::move(on_closed);
46 
47  // This forces not using Nagle's algorithm.
48  // Improves the sync mode velocity on Linux by a factor of ~3.
49  const boost::asio::ip::tcp::no_delay option(true);
50  _socket.set_option(option);
51 
52  StartTimer();
53  auto self = shared_from_this(); // To keep myself alive.
54  boost::asio::post(_strand, [=]() {
55 
56  auto handle_query = [this, self, callback=std::move(on_opened)](
57  const boost::system::error_code &ec,
58  size_t DEBUG_ONLY(bytes_received)) {
59  if (!ec) {
60  DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
61  log_debug("session", _session_id, "for stream", _stream_id, " started");
62  boost::asio::post(_strand.context(), [=]() { callback(self); });
63  } else {
64  log_error("session", _session_id, ": error retrieving stream id :", ec.message());
65  CloseNow(ec);
66  }
67  };
68 
69  // Read the stream id.
70  _deadline.expires_from_now(_timeout);
71  boost::asio::async_read(
72  _socket,
73  boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
74  boost::asio::bind_executor(_strand, handle_query));
75  });
76  }
77 
78  void ServerSession::Write(std::shared_ptr<const Message> message) {
79  DEBUG_ASSERT(message != nullptr);
80  DEBUG_ASSERT(!message->empty());
81  auto self = shared_from_this();
82  boost::asio::post(_strand, [=]() {
83  if (!_socket.is_open()) {
84  return;
85  }
86  if (_is_writing) {
87  if (_server.IsSynchronousMode()) {
88  // wait until previous message has been sent
89  while (_is_writing) {
90  std::this_thread::yield();
91  }
92  } else {
93  // ignore this message
94  log_debug("session", _session_id, ": connection too slow: message discarded");
95  return;
96  }
97  }
98  _is_writing = true;
99 
100  auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
101  _is_writing = false;
102  if (ec) {
103  log_info("session", _session_id, ": error sending data :", ec.message());
104  CloseNow(ec);
105  } else {
106  DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
107  DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
108  }
109  };
110 
111  log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
112 
113  _deadline.expires_from_now(_timeout);
114  boost::asio::async_write(
115  _socket,
116  message->GetBufferSequence(),
117  handle_sent);
118  });
119  }
120 
122  boost::asio::post(_strand, [self=shared_from_this()]() { self->CloseNow(); });
123  }
124 
126  if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
127  log_debug("session", _session_id, "timed out");
128  Close();
129  } else {
130  _deadline.async_wait([this, self=shared_from_this()](boost::system::error_code ec) {
131  if (!ec) {
132  StartTimer();
133  } else {
134  log_debug("session", _session_id, "timed out error:", ec.message());
135  }
136  });
137  }
138  }
139 
140  void ServerSession::CloseNow(boost::system::error_code ec) {
141  _deadline.cancel();
142  if (!ec)
143  {
144  if (_socket.is_open()) {
145  boost::system::error_code ec2;
146  _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
147  _socket.close();
148  }
149  }
150  _on_closed(shared_from_this());
151  log_debug("session", _session_id, "closed");
152  }
153 
154 } // namespace tcp
155 } // namespace detail
156 } // namespace streaming
157 } // namespace carla
static void log_error(Args &&... args)
Definition: Logging.h:110
void CloseNow(boost::system::error_code ec=boost::system::error_code())
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
void Close()
Post a job to close the session.
static void log_debug(Args &&...)
Definition: Logging.h:75
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
uint32_t message_size_type
Definition: Types.h:20
std::function< void(std::shared_ptr< ServerSession >)> callback_function_type
Definition: ServerSession.h:48
void Write(std::shared_ptr< const Message > message)
Writes some data to the socket.
static void log_info(Args &&... args)
Definition: Logging.h:82
static std::atomic_size_t SESSION_COUNTER
Positive time duration up to milliseconds resolution.
Definition: Time.h:19
#define DEBUG_ONLY(code)
Definition: Debug.h:55
void Open(callback_function_type on_opened, callback_function_type on_closed)
Starts the session and calls on_opened after successfully reading the stream id, and on_closed once t...
ServerSession(boost::asio::io_context &io_context, time_duration timeout, Server &server)
boost::asio::io_context::strand _strand