CARLA
MultiStreamState.h
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #pragma once
8 
10 #include "carla/Logging.h"
13 
14 #include <mutex>
15 #include <vector>
16 #include <atomic>
17 
18 namespace carla {
19 namespace streaming {
20 namespace detail {
21 
22  /// A stream state that can hold any number of sessions.
23  ///
24  /// @todo Lacking some optimization.
25  class MultiStreamState final : public StreamStateBase {
26  public:
27 
29 
31  StreamStateBase(token),
32  _session(nullptr)
33  {};
34 
35  template <typename... Buffers>
36  void Write(Buffers... buffers) {
37  // try write single stream
38  auto session = _session.load();
39  if (session != nullptr) {
40  auto message = Session::MakeMessage(buffers...);
41  session->Write(std::move(message));
42  log_debug("sensor ", session->get_stream_id()," data sent");
43  // Return here, _session is only valid if we have a
44  // single session.
45  return;
46  }
47 
48  // try write multiple stream
49  std::lock_guard<std::mutex> lock(_mutex);
50  if (_sessions.size() > 0) {
51  auto message = Session::MakeMessage(buffers...);
52  for (auto &s : _sessions) {
53  if (s != nullptr) {
54  s->Write(message);
55  log_debug("sensor ", s->get_stream_id()," data sent ");
56  }
57  }
58  }
59  }
60 
61  void ForceActive() {
62  _force_active = true;
63  }
64 
65  void EnableForROS() {
66  _enabled_for_ros = true;
67  }
68 
69  void DisableForROS() {
70  _enabled_for_ros = false;
71  }
72 
73  bool IsEnabledForROS() {
74  return _enabled_for_ros;
75  }
76 
78  return (_sessions.size() > 0 || _force_active || _enabled_for_ros);
79  }
80 
81  void ConnectSession(std::shared_ptr<Session> session) final {
82  DEBUG_ASSERT(session != nullptr);
83  std::lock_guard<std::mutex> lock(_mutex);
84  _sessions.emplace_back(std::move(session));
85  log_debug("Connecting multistream sessions:", _sessions.size());
86  if (_sessions.size() == 1) {
87  _session.store(_sessions[0]);
88  }
89  else if (_sessions.size() > 1) {
90  _session.store(nullptr);
91  }
92  }
93 
94  void DisconnectSession(std::shared_ptr<Session> session) final {
95  DEBUG_ASSERT(session != nullptr);
96  std::lock_guard<std::mutex> lock(_mutex);
97  log_debug("Calling DisconnectSession for ", session->get_stream_id());
98  if (_sessions.size() == 0) return;
99  if (_sessions.size() == 1) {
100  DEBUG_ASSERT(session == _session.load());
101  _session.store(nullptr);
102  _sessions.clear();
103  _force_active = false;
104  log_debug("Last session disconnected");
105  } else {
106  _sessions.erase(
107  std::remove(_sessions.begin(), _sessions.end(), session),
108  _sessions.end());
109 
110  // set single session if only one
111  if (_sessions.size() == 1)
112  _session.store(_sessions[0]);
113  else
114  _session.store(nullptr);
115  }
116  log_debug("Disconnecting multistream sessions:", _sessions.size());
117  }
118 
119  void ClearSessions() final {
120  std::lock_guard<std::mutex> lock(_mutex);
121  for (auto &s : _sessions) {
122  if (s != nullptr) {
123  s->Close();
124  }
125  }
126  _sessions.clear();
127  _force_active = false;
128  _session.store(nullptr);
129  log_debug("Disconnecting all multistream sessions");
130  }
131 
132  private:
133 
134  std::mutex _mutex;
135 
136  // if there is only one session, then we use atomic
138  // if there are more than one session, we use vector of sessions with mutex
139  std::vector<std::shared_ptr<Session>> _sessions;
140  bool _force_active {false};
141  bool _enabled_for_ros {false};
142  };
143 
144 } // namespace detail
145 } // namespace streaming
146 } // namespace carla
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
static void log_debug(Args &&...)
Definition: Logging.h:75
void ConnectSession(std::shared_ptr< Session > session) final
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
Serializes a stream endpoint.
Definition: detail/Token.h:61
A stream state that can hold any number of sessions.
void DisconnectSession(std::shared_ptr< Session > session) final
A very simple atomic shared ptr with release-acquire memory order.
std::vector< std::shared_ptr< Session > > _sessions
static auto MakeMessage(Buffers... buffers)
Definition: ServerSession.h:68
Shared state among all the copies of a stream.