CARLA
Dispatcher.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/Exception.h"
10 #include "carla/Logging.h"
12 
13 #include <exception>
14 
15 namespace carla {
16 namespace streaming {
17 namespace detail {
18 
20  // Disconnect all the sessions from their streams, this should kill any
21  // session remaining since at this point the io_context should be already
22  // stopped.
23  for (auto &pair : _stream_map) {
24 #ifndef LIBCARLA_NO_EXCEPTIONS
25  try {
26 #endif // LIBCARLA_NO_EXCEPTIONS
27  auto stream_state = pair.second;
28  stream_state->ClearSessions();
29 #ifndef LIBCARLA_NO_EXCEPTIONS
30  } catch (const std::exception &e) {
31  log_error("failed to clear sessions:", e.what());
32  }
33 #endif // LIBCARLA_NO_EXCEPTIONS
34  }
35  }
36 
38  std::lock_guard<std::mutex> lock(_mutex);
39  ++_cached_token._token.stream_id; // id zero only happens in overflow.
40  log_debug("New stream:", _cached_token._token.stream_id);
41  std::shared_ptr<MultiStreamState> ptr;
42  auto search = _stream_map.find(_cached_token.get_stream_id());
43  if (search == _stream_map.end()) {
44  // creating new stream
45  ptr = std::make_shared<MultiStreamState>(_cached_token);
46  auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
47  if (!result.second) {
48  throw_exception(std::runtime_error("failed to create stream!"));
49  }
50  log_debug("Stream created");
51  return carla::streaming::Stream(ptr);
52  } else {
53  // reusing existing stream
54  log_debug("Stream reused");
55  ptr = search->second;
56  return carla::streaming::Stream(ptr);
57  }
58  }
59 
61  std::lock_guard<std::mutex> lock(_mutex);
62  log_debug("Calling CloseStream for ", id);
63  auto search = _stream_map.find(id);
64  if (search != _stream_map.end()) {
65  auto stream_state = search->second;
66  if (stream_state) {
67  log_debug("Disconnecting all sessions (stream ", id, ")");
68  stream_state->ClearSessions();
69  }
70  _stream_map.erase(search);
71  }
72  }
73 
74  bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
75  DEBUG_ASSERT(session != nullptr);
76  std::lock_guard<std::mutex> lock(_mutex);
77  auto search = _stream_map.find(session->get_stream_id());
78  if (search != _stream_map.end()) {
79  auto stream_state = search->second;
80  if (stream_state) {
81  log_debug("Connecting session (stream ", session->get_stream_id(), ")");
82  stream_state->ConnectSession(std::move(session));
83  log_debug("Current streams: ", _stream_map.size());
84  return true;
85  }
86  }
87  log_error("Invalid session: no stream available with id", session->get_stream_id());
88  return false;
89  }
90 
91  void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
92  DEBUG_ASSERT(session != nullptr);
93  std::lock_guard<std::mutex> lock(_mutex);
94  log_debug("Calling DeregisterSession for ", session->get_stream_id());
95  auto search = _stream_map.find(session->get_stream_id());
96  if (search != _stream_map.end()) {
97  auto stream_state = search->second;
98  if (stream_state) {
99  log_debug("Disconnecting session (stream ", session->get_stream_id(), ")");
100  stream_state->DisconnectSession(session);
101  log_debug("Current streams: ", _stream_map.size());
102  }
103  }
104  }
105 
107  std::lock_guard<std::mutex> lock(_mutex);
108  log_debug("Searching sensor id: ", sensor_id);
109  auto search = _stream_map.find(sensor_id);
110  if (search != _stream_map.end()) {
111  log_debug("Found sensor id: ", sensor_id);
112  auto stream_state = search->second;
113  stream_state->ForceActive();
114  log_debug("Getting token from stream ", sensor_id, " on port ", stream_state->token().get_port());
115  return stream_state->token();
116  } else {
117  log_debug("Not Found sensor id, creating sensor stream: ", sensor_id);
118  token_type temp_token(_cached_token);
119  temp_token.set_stream_id(sensor_id);
120  auto ptr = std::make_shared<MultiStreamState>(temp_token);
121  auto result = _stream_map.emplace(std::make_pair(temp_token.get_stream_id(), ptr));
122  ptr->ForceActive();
123  if (!result.second) {
124  log_debug("Failed to create multistream for stream ", sensor_id, " on port ", temp_token.get_port());
125  }
126  log_debug("Created token from stream ", sensor_id, " on port ", temp_token.get_port());
127  return temp_token;
128  }
129  return token_type();
130  }
131 
132 } // namespace detail
133 } // namespace streaming
134 } // namespace carla
void throw_exception(const std::exception &e)
Definition: Carla.cpp:135
static void log_error(Args &&... args)
Definition: Logging.h:110
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
uint32_t stream_id_type
Definition: Types.h:18
static void log_debug(Args &&...)
Definition: Logging.h:75
token_type GetToken(stream_id_type sensor_id)
Definition: Dispatcher.cpp:106
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
void CloseStream(carla::streaming::detail::stream_id_type id)
Definition: Dispatcher.cpp:60
Serializes a stream endpoint.
Definition: detail/Token.h:61
void DeregisterSession(std::shared_ptr< Session > session)
Definition: Dispatcher.cpp:91
bool RegisterSession(std::shared_ptr< Session > session)
Definition: Dispatcher.cpp:74
detail::Stream< detail::MultiStreamState > Stream
A stream represents an unidirectional channel for sending data from server to client.
Definition: Stream.h:19
const auto & get_stream_id() const
Definition: detail/Token.h:116
carla::streaming::Stream MakeStream()
Definition: Dispatcher.cpp:37
carla::streaming::detail::token_type token_type
void set_stream_id(stream_id_type id)
Definition: detail/Token.h:120