32 _endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(
"0.0.0.0"), port);
38 std::weak_ptr<Router> weak = shared_from_this();
41 auto self = weak.lock();
43 self->ConnectSession(session);
47 auto self = weak.lock();
49 self->DisconnectSession(session);
53 [=](std::shared_ptr<carla::multigpu::Primary> session,
carla::Buffer buffer) {
54 auto self = weak.lock();
56 std::lock_guard<std::mutex> lock(self->_mutex);
57 auto prom =
self->
_promises.find(session.get());
58 if (prom != self->_promises.end()) {
59 log_info(
"Got data from secondary (with promise): ", buffer.size());
60 prom->second->set_value({session, std::move(buffer)});
61 self->_promises.erase(prom);
63 log_info(
"Got data from secondary (without promise): ", buffer.size());
69 _listener->Listen(on_open, on_close, on_response);
88 std::lock_guard<std::mutex> lock(
_mutex);
89 _sessions.emplace_back(std::move(session));
98 std::lock_guard<std::mutex> lock(
_mutex);
107 std::lock_guard<std::mutex> lock(
_mutex);
109 log_info(
"Disconnecting all secondary servers");
116 header.
size = buffer.size();
117 Buffer buf_header((uint8_t *) &header,
sizeof(header));
124 std::lock_guard<std::mutex> lock(
_mutex);
136 header.
size = buffer.size();
137 Buffer buf_header((uint8_t *) &header,
sizeof(header));
144 auto response = std::make_shared<std::promise<SessionInfo>>();
147 std::lock_guard<std::mutex> lock(
_mutex);
156 std::cout <<
"Updated promise into map: " <<
_promises.size() << std::endl;
161 return response->get_future();
168 header.
size = buffer.size();
169 Buffer buf_header((uint8_t *) &header,
sizeof(header));
176 auto response = std::make_shared<std::promise<SessionInfo>>();
179 std::lock_guard<std::mutex> lock(
_mutex);
180 auto s = server.lock();
185 return response->get_future();
189 std::lock_guard<std::mutex> lock(
_mutex);
196 return std::weak_ptr<Primary>();
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
boost::asio::ip::tcp::endpoint _endpoint
std::future< SessionInfo > WriteToOne(std::weak_ptr< Primary > server, MultiGPUCommand id, Buffer &&buffer)
std::function< void(std::shared_ptr< Primary >)> callback_function_type
static auto MakeMessage(Buffers... buffers)
std::function< void(void)> _callback
This file contains definitions of common data structures used in traffic manager. ...
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
std::weak_ptr< Primary > GetNextServer()
std::unordered_map< Primary *, std::shared_ptr< std::promise< SessionInfo > > > _promises
#define DEBUG_ASSERT(predicate)
std::shared_ptr< Listener > _listener
void ConnectSession(std::shared_ptr< Primary > session)
std::vector< std::shared_ptr< Primary > > _sessions
std::future< SessionInfo > WriteToNext(MultiGPUCommand id, Buffer &&buffer)
void AsyncRun(size_t worker_threads)
void DisconnectSession(std::shared_ptr< Primary > session)
void SetNewConnectionCallback(std::function< void(void)>)
static void log_info(Args &&... args)
void Stop()
Stop the ThreadPool and join all its threads.
PrimaryCommands _commander
void Write(MultiGPUCommand id, Buffer &&buffer)
void AsyncRun(size_t worker_threads)
Launch threads to run tasks asynchronously.
auto & io_context()
Return the underlying io_context.
void set_router(std::shared_ptr< Router > router)