16 #include <boost/asio/connect.hpp> 17 #include <boost/asio/read.hpp> 18 #include <boost/asio/write.hpp> 19 #include <boost/asio/post.hpp> 20 #include <boost/asio/bind_executor.hpp> 28 boost::asio::ip::tcp::endpoint ep,
31 _socket(_pool.io_context()),
33 _strand(_pool.io_context()),
34 _connection_timer(_pool.io_context()),
51 boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip);
52 _endpoint = boost::asio::ip::tcp::endpoint(ip_address, port);
65 std::weak_ptr<Secondary> weak = shared_from_this();
66 boost::asio::post(
_strand, [weak]() {
67 auto self = weak.lock();
74 if (self->_socket.is_open()) {
75 self->_socket.close();
78 auto handle_connect = [weak](boost::system::error_code ec) {
79 auto self = weak.lock();
82 log_error(
"secondary server: connection failed:", ec.message());
94 self->_socket.set_option(boost::asio::ip::tcp::no_delay(
true));
96 log_info(
"secondary server: connected to ", self->_endpoint);
101 self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect));
107 std::weak_ptr<Secondary> weak = shared_from_this();
108 boost::asio::post(
_strand, [weak]() {
109 auto self = weak.lock();
112 if (self->_socket.is_open()) {
113 self->_socket.close();
119 std::weak_ptr<Secondary> weak = shared_from_this();
122 auto self = weak.lock();
134 void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
137 std::weak_ptr<Secondary> weak = shared_from_this();
138 boost::asio::post(
_strand, [=]() {
139 auto self = weak.lock();
141 if (!self->_socket.is_open()) {
145 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
146 auto self = weak.lock();
149 log_error(
"error sending data: ", ec.message());
154 boost::asio::async_write(
156 message->GetBufferSequence(),
157 boost::asio::bind_executor(self->_strand, handle_sent));
167 std::weak_ptr<Secondary> weak = shared_from_this();
168 boost::asio::post(
_strand, [=]() {
169 auto self = weak.lock();
171 if (!self->_socket.is_open()) {
175 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
176 auto self = weak.lock();
179 log_error(
"error sending data: ", ec.message());
184 boost::asio::async_write(
186 message->GetBufferSequence(),
187 boost::asio::bind_executor(self->_strand, handle_sent));
192 std::weak_ptr<Secondary> weak = shared_from_this();
193 boost::asio::post(
_strand, [=]() {
194 auto self = weak.lock();
196 if (!self->_socket.is_open()) {
200 auto handle_sent = [weak](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
201 auto self = weak.lock();
204 log_error(
"error sending data: ", ec.message());
210 int this_size = text.size();
211 boost::asio::async_write(
213 boost::asio::buffer(&this_size,
sizeof(this_size)),
214 boost::asio::bind_executor(self->_strand, handle_sent));
217 boost::asio::async_write(
219 boost::asio::buffer(text.c_str(), text.size()),
220 boost::asio::bind_executor(self->_strand, handle_sent));
225 std::weak_ptr<Secondary> weak = shared_from_this();
226 boost::asio::post(
_strand, [weak]() {
227 auto self = weak.lock();
233 auto message = std::make_shared<IncomingMessage>(
self->_buffer_pool->Pop());
235 auto handle_read_data = [weak, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
236 auto self = weak.lock();
243 self->GetCommander().process_command(message->pop());
247 log_error(
"secondary server: failed to read data: ", ec.message());
252 auto handle_read_header = [weak, message, handle_read_data](
253 boost::system::error_code ec,
255 auto self = weak.lock();
257 if (!ec && (message->size() > 0u)) {
264 boost::asio::async_read(
267 boost::asio::bind_executor(self->_strand, handle_read_data));
268 }
else if (!self->_done) {
269 log_error(
"secondary server: failed to read header: ", ec.message());
277 boost::asio::async_read(
279 message->size_as_buffer(),
280 boost::asio::bind_executor(self->_strand, handle_read_header));
#define DEBUG_ASSERT_NE(lhs, rhs)
boost::asio::io_context::strand _strand
SecondaryCommands _commander
static void log_error(Args &&... args)
This file contains definitions of common data structures used in traffic manager. ...
boost::asio::ip::tcp::endpoint _endpoint
boost::asio::deadline_timer _connection_timer
static time_duration seconds(size_t timeout)
#define DEBUG_ASSERT_EQ(lhs, rhs)
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
std::function< void(MultiGPUCommand, carla::Buffer)> callback_type
void AsyncRun(size_t worker_threads)
#define DEBUG_ASSERT(predicate)
uint32_t message_size_type
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback)
static auto MakeMessage(Buffers... buffers)
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
void set_secondary(std::shared_ptr< Secondary > secondary)
static void log_info(Args &&... args)
void Stop()
Stop the ThreadPool and join all its threads.
std::shared_ptr< BufferPool > _buffer_pool
boost::asio::ip::tcp::socket _socket
void AsyncRun(size_t worker_threads)
Launch threads to run tasks asynchronously.
void set_callback(callback_type callback)