CARLA
router.cpp
Go to the documentation of this file.
1 // Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
11 
12 namespace carla {
13 namespace multigpu {
14 
16  _next(0) { }
17 
19  Stop();
20 }
21 
22 void Router::Stop() {
23  ClearSessions();
24  _listener->Stop();
25  _listener.reset();
26  _pool.Stop();
27 }
28 
29 Router::Router(uint16_t port) :
30  _next(0) {
31 
32  _endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("0.0.0.0"), port);
33  _listener = std::make_shared<carla::multigpu::Listener>(_pool.io_context(), _endpoint);
34 }
35 
37  // prepare server
38  std::weak_ptr<Router> weak = shared_from_this();
39 
40  carla::multigpu::Listener::callback_function_type on_open = [=](std::shared_ptr<carla::multigpu::Primary> session) {
41  auto self = weak.lock();
42  if (!self) return;
43  self->ConnectSession(session);
44  };
45 
46  carla::multigpu::Listener::callback_function_type on_close = [=](std::shared_ptr<carla::multigpu::Primary> session) {
47  auto self = weak.lock();
48  if (!self) return;
49  self->DisconnectSession(session);
50  };
51 
53  [=](std::shared_ptr<carla::multigpu::Primary> session, carla::Buffer buffer) {
54  auto self = weak.lock();
55  if (!self) return;
56  std::lock_guard<std::mutex> lock(self->_mutex);
57  auto prom =self-> _promises.find(session.get());
58  if (prom != self->_promises.end()) {
59  log_info("Got data from secondary (with promise): ", buffer.size());
60  prom->second->set_value({session, std::move(buffer)});
61  self->_promises.erase(prom);
62  } else {
63  log_info("Got data from secondary (without promise): ", buffer.size());
64  }
65  };
66 
67  _commander.set_router(shared_from_this());
68 
69  _listener->Listen(on_open, on_close, on_response);
70  log_info("Listening at ", _endpoint);
71 }
72 
73 void Router::SetNewConnectionCallback(std::function<void(void)> func)
74 {
75  _callback = func;
76 }
77 
78 void Router::AsyncRun(size_t worker_threads) {
79  _pool.AsyncRun(worker_threads);
80 }
81 
82 boost::asio::ip::tcp::endpoint Router::GetLocalEndpoint() const {
83  return _endpoint;
84 }
85 
86 void Router::ConnectSession(std::shared_ptr<Primary> session) {
87  DEBUG_ASSERT(session != nullptr);
88  std::lock_guard<std::mutex> lock(_mutex);
89  _sessions.emplace_back(std::move(session));
90  log_info("Connected secondary servers:", _sessions.size());
91  // run external callback for new connections
92  if (_callback)
93  _callback();
94 }
95 
96 void Router::DisconnectSession(std::shared_ptr<Primary> session) {
97  DEBUG_ASSERT(session != nullptr);
98  std::lock_guard<std::mutex> lock(_mutex);
99  if (_sessions.size() == 0) return;
100  _sessions.erase(
101  std::remove(_sessions.begin(), _sessions.end(), session),
102  _sessions.end());
103  log_info("Connected secondary servers:", _sessions.size());
104 }
105 
107  std::lock_guard<std::mutex> lock(_mutex);
108  _sessions.clear();
109  log_info("Disconnecting all secondary servers");
110 }
111 
113  // define the command header
114  CommandHeader header;
115  header.id = id;
116  header.size = buffer.size();
117  Buffer buf_header((uint8_t *) &header, sizeof(header));
118 
119  auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
120  auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
121  auto message = Primary::MakeMessage(view_header, view_data);
122 
123  // write to multiple servers
124  std::lock_guard<std::mutex> lock(_mutex);
125  for (auto &s : _sessions) {
126  if (s != nullptr) {
127  s->Write(message);
128  }
129  }
130 }
131 
132 std::future<SessionInfo> Router::WriteToNext(MultiGPUCommand id, Buffer &&buffer) {
133  // define the command header
134  CommandHeader header;
135  header.id = id;
136  header.size = buffer.size();
137  Buffer buf_header((uint8_t *) &header, sizeof(header));
138 
139  auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
140  auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
141  auto message = Primary::MakeMessage(view_header, view_data);
142 
143  // create the promise for the posible answer
144  auto response = std::make_shared<std::promise<SessionInfo>>();
145 
146  // write to the next server only
147  std::lock_guard<std::mutex> lock(_mutex);
148  if (_next >= _sessions.size()) {
149  _next = 0;
150  }
151  if (_next < _sessions.size()) {
152  // std::cout << "Sending to session " << _next << std::endl;
153  auto s = _sessions[_next];
154  if (s != nullptr) {
155  _promises[s.get()] = response;
156  std::cout << "Updated promise into map: " << _promises.size() << std::endl;
157  s->Write(message);
158  }
159  }
160  ++_next;
161  return response->get_future();
162 }
163 
164 std::future<SessionInfo> Router::WriteToOne(std::weak_ptr<Primary> server, MultiGPUCommand id, Buffer &&buffer) {
165  // define the command header
166  CommandHeader header;
167  header.id = id;
168  header.size = buffer.size();
169  Buffer buf_header((uint8_t *) &header, sizeof(header));
170 
171  auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
172  auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
173  auto message = Primary::MakeMessage(view_header, view_data);
174 
175  // create the promise for the posible answer
176  auto response = std::make_shared<std::promise<SessionInfo>>();
177 
178  // write to the specific server only
179  std::lock_guard<std::mutex> lock(_mutex);
180  auto s = server.lock();
181  if (s) {
182  _promises[s.get()] = response;
183  s->Write(message);
184  }
185  return response->get_future();
186 }
187 
188 std::weak_ptr<Primary> Router::GetNextServer() {
189  std::lock_guard<std::mutex> lock(_mutex);
190  if (_next >= _sessions.size()) {
191  _next = 0;
192  }
193  if (_next < _sessions.size()) {
194  return std::weak_ptr<Primary>(_sessions[_next]);
195  } else {
196  return std::weak_ptr<Primary>();
197  }
198 }
199 
200 } // namespace multigpu
201 } // namespace carla
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition: listener.h:33
ThreadPool _pool
Definition: router.h:71
boost::asio::ip::tcp::endpoint _endpoint
Definition: router.h:72
std::future< SessionInfo > WriteToOne(std::weak_ptr< Primary > server, MultiGPUCommand id, Buffer &&buffer)
Definition: router.cpp:164
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition: listener.h:32
static auto MakeMessage(Buffers... buffers)
Definition: primary.h:54
std::function< void(void)> _callback
Definition: router.h:78
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const
Definition: router.cpp:82
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition: BufferView.h:56
std::weak_ptr< Primary > GetNextServer()
Definition: router.cpp:188
std::unordered_map< Primary *, std::shared_ptr< std::promise< SessionInfo > > > _promises
Definition: router.h:76
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
std::shared_ptr< Listener > _listener
Definition: router.h:74
void ConnectSession(std::shared_ptr< Primary > session)
Definition: router.cpp:86
std::vector< std::shared_ptr< Primary > > _sessions
Definition: router.h:73
std::future< SessionInfo > WriteToNext(MultiGPUCommand id, Buffer &&buffer)
Definition: router.cpp:132
void AsyncRun(size_t worker_threads)
Definition: router.cpp:78
void DisconnectSession(std::shared_ptr< Primary > session)
Definition: router.cpp:96
void SetNewConnectionCallback(std::function< void(void)>)
Definition: router.cpp:73
static void log_info(Args &&... args)
Definition: Logging.h:82
void Stop()
Stop the ThreadPool and join all its threads.
Definition: ThreadPool.h:76
PrimaryCommands _commander
Definition: router.h:77
void Write(MultiGPUCommand id, Buffer &&buffer)
Definition: router.cpp:112
A piece of raw data.
Definition: carla/Buffer.h:42
std::mutex _mutex
Definition: router.h:70
void AsyncRun(size_t worker_threads)
Launch threads to run tasks asynchronously.
Definition: ThreadPool.h:51
auto & io_context()
Return the underlying io_context.
Definition: ThreadPool.h:35
void set_router(std::shared_ptr< Router > router)