CARLA
MultiStreamState.h
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #pragma once
8 
10 #include "carla/Logging.h"
13 
14 #include <mutex>
15 #include <vector>
16 #include <atomic>
17 
18 namespace carla {
19 namespace streaming {
20 namespace detail {
21 
22  /// A stream state that can hold any number of sessions.
23  ///
24  /// @todo Lacking some optimization.
25  class MultiStreamState final : public StreamStateBase {
26  public:
27 
29 
31  StreamStateBase(token),
32  _session(nullptr)
33  {};
34 
35  template <typename... Buffers>
36  void Write(Buffers &&... buffers) {
37  auto message = Session::MakeMessage(std::move(buffers)...);
38 
39  // try write single stream
40  auto session = _session.load();
41  if (session != nullptr) {
42  session->Write(std::move(message));
43  // Return here, _session is only valid if we have a
44  // single session.
45  return;
46  }
47 
48  // try write multiple stream
49  std::lock_guard<std::mutex> lock(_mutex);
50  for (auto &s : _sessions) {
51  if (s != nullptr) {
52  s->Write(message);
53  }
54  }
55  }
56 
57  private:
58 
59  void ConnectSession(std::shared_ptr<Session> session) final {
60  DEBUG_ASSERT(session != nullptr);
61  std::lock_guard<std::mutex> lock(_mutex);
62  _sessions.emplace_back(std::move(session));
63  log_debug("Connecting multistream sessions:", _sessions.size());
64  if (_sessions.size() == 1) {
65  _session.store(_sessions[0]);
66  }
67  else if (_sessions.size() > 1) {
68  _session.store(nullptr);
69  }
70  }
71 
72  void DisconnectSession(std::shared_ptr<Session> session) final {
73  DEBUG_ASSERT(session != nullptr);
74  std::lock_guard<std::mutex> lock(_mutex);
75  if (_sessions.size() == 0) return;
76  if (_sessions.size() == 1) {
77  DEBUG_ASSERT(session == _session.load());
78  _session.store(nullptr);
79  _sessions.clear();
80  } else {
81  _sessions.erase(
82  std::remove(_sessions.begin(), _sessions.end(), session),
83  _sessions.end());
84 
85  // set single session if only one
86  if (_sessions.size() == 1)
87  _session.store(_sessions[0]);
88  else
89  _session.store(nullptr);
90  }
91  log_debug("Disconnecting multistream sessions:", _sessions.size());
92  }
93 
94  void ClearSessions() final {
95  std::lock_guard<std::mutex> lock(_mutex);
96  _sessions.clear();
97  _session.store(nullptr);
98  log_debug("Disconnecting all multistream sessions");
99  }
100 
101  std::mutex _mutex;
102 
103  // if there is only one session, then we use atomic
105  // if there are more than one session, we use vector of sessions with mutex
106  std::vector<std::shared_ptr<Session>> _sessions;
107  };
108 
109 } // namespace detail
110 } // namespace streaming
111 } // namespace carla
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:99
static void log_debug(Args &&...)
Definition: Logging.h:75
void ConnectSession(std::shared_ptr< Session > session) final
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
Serializes a stream endpoint.
Definition: detail/Token.h:61
A stream state that can hold any number of sessions.
static auto MakeMessage(Buffers &&... buffers)
Definition: ServerSession.h:61
void DisconnectSession(std::shared_ptr< Session > session) final
A very simple atomic shared ptr with release-acquire memory order.
std::vector< std::shared_ptr< Session > > _sessions
Shared state among all the copies of a stream.