CARLA
Dispatcher.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/Exception.h"
10 #include "carla/Logging.h"
12 
13 #include <exception>
14 
15 namespace carla {
16 namespace streaming {
17 namespace detail {
18 
19  template <typename StreamStateT, typename StreamMapT>
20  static auto MakeStreamState(const token_type &cached_token, StreamMapT &stream_map) {
21  auto ptr = std::make_shared<StreamStateT>(cached_token);
22  auto result = stream_map.emplace(std::make_pair(cached_token.get_stream_id(), ptr));
23  if (!result.second) {
24  throw_exception(std::runtime_error("failed to create stream!"));
25  }
26  return ptr;
27  }
28 
30  // Disconnect all the sessions from their streams, this should kill any
31  // session remaining since at this point the io_context should be already
32  // stopped.
33  for (auto &pair : _stream_map) {
34 #ifndef LIBCARLA_NO_EXCEPTIONS
35  try {
36 #endif // LIBCARLA_NO_EXCEPTIONS
37  auto stream_state = pair.second.lock();
38  if (stream_state != nullptr) {
39  stream_state->ClearSessions();
40  }
41 #ifndef LIBCARLA_NO_EXCEPTIONS
42  } catch (const std::exception &e) {
43  log_error("failed to clear sessions:", e.what());
44  }
45 #endif // LIBCARLA_NO_EXCEPTIONS
46  }
47  }
48 
50  std::lock_guard<std::mutex> lock(_mutex);
51  ++_cached_token._token.stream_id; // id zero only happens in overflow.
52  log_info("Created new stream:", _cached_token._token.stream_id);
53  return MakeStreamState<MultiStreamState>(_cached_token, _stream_map);
54  }
55 
56  bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
57  DEBUG_ASSERT(session != nullptr);
58  std::lock_guard<std::mutex> lock(_mutex);
59  auto search = _stream_map.find(session->get_stream_id());
60  if (search != _stream_map.end()) {
61  auto stream_state = search->second.lock();
62  if (stream_state != nullptr) {
63  log_info("Connecting session (stream ", session->get_stream_id(), ")");
64  stream_state->ConnectSession(std::move(session));
65  return true;
66  }
67  }
68  log_error("Invalid session: no stream available with id", session->get_stream_id());
69  return false;
70  }
71 
72  void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
73  DEBUG_ASSERT(session != nullptr);
74  std::lock_guard<std::mutex> lock(_mutex);
76  auto search = _stream_map.find(session->get_stream_id());
77  if (search != _stream_map.end()) {
78  auto stream_state = search->second.lock();
79  if (stream_state != nullptr) {
80  log_info("Disconnecting session (stream ", session->get_stream_id(), ")");
81  stream_state->DisconnectSession(session);
82  }
83  }
84  }
85 
87  for (auto it = _stream_map.begin(); it != _stream_map.end(); ) {
88  if (it->second.expired()) {
89  it = _stream_map.erase(it);
90  } else {
91  ++it;
92  }
93  }
94  }
95 
96 } // namespace detail
97 } // namespace streaming
98 } // namespace carla
static auto MakeStreamState(const token_type &cached_token, StreamMapT &stream_map)
Definition: Dispatcher.cpp:20
std::unordered_map< stream_id_type, std::weak_ptr< StreamStateBase > > _stream_map
Definition: Dispatcher.h:52
void throw_exception(const std::exception &e)
Definition: Carla.cpp:101
static void log_error(Args &&... args)
Definition: Logging.h:110
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:99
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
Serializes a stream endpoint.
Definition: detail/Token.h:61
void DeregisterSession(std::shared_ptr< Session > session)
Definition: Dispatcher.cpp:72
bool RegisterSession(std::shared_ptr< Session > session)
Definition: Dispatcher.cpp:56
static void log_info(Args &&... args)
Definition: Logging.h:82
const auto & get_stream_id() const
Definition: detail/Token.h:111
carla::streaming::Stream MakeStream()
Definition: Dispatcher.cpp:49