CARLA
Episode.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
8 
9 #include "carla/Logging.h"
14 
15 #include <exception>
16 
17 namespace carla {
18 namespace client {
19 namespace detail {
20 
21 using namespace std::chrono_literals;
22 
23  static auto &CastData(const sensor::SensorData &data) {
24  using target_t = const sensor::data::RawEpisodeState;
25  return static_cast<target_t &>(data);
26  }
27 
28  template <typename RangeT>
29  static auto GetActorsById_Impl(Client &client, CachedActorList &actors, const RangeT &actor_ids) {
30  auto missing_ids = actors.GetMissingIds(actor_ids);
31  if (!missing_ids.empty()) {
32  actors.InsertRange(client.GetActorsById(missing_ids));
33  }
34  return actors.GetActorsById(actor_ids);
35  }
36 
37  Episode::Episode(Client &client, std::weak_ptr<Simulator> simulator)
38  : Episode(client, client.GetEpisodeInfo(), simulator) {}
39 
40  Episode::Episode(Client &client, const rpc::EpisodeInfo &info, std::weak_ptr<Simulator> simulator)
41  : _client(client),
42  _state(std::make_shared<EpisodeState>(info.id)),
43  _simulator(simulator),
44  _token(info.token) {}
45 
47  try {
49  } catch (const std::exception &e) {
50  log_error("exception trying to disconnect from episode:", e.what());
51  }
52  }
53 
54  void Episode::Listen() {
55  std::weak_ptr<Episode> weak = shared_from_this();
56  _client.SubscribeToStream(_token, [weak](auto buffer) {
57  auto self = weak.lock();
58  if (self != nullptr) {
59 
60  auto data = sensor::Deserializer::Deserialize(std::move(buffer));
61  auto next = std::make_shared<const EpisodeState>(CastData(*data));
62  auto prev = self->GetState();
63 
64  // TODO: Update how the map change is detected
65  bool HasMapChanged = next->HasMapChanged();
66  bool UpdateLights = next->IsLightUpdatePending();
67 
68  /// Check for pending exceptions (Mainly TM server closed)
69  if(self->_pending_exceptions) {
70 
71  /// Mark pending exception false
72  self->_pending_exceptions = false;
73 
74  /// Create exception for the error message
75  auto exception(self->_pending_exceptions_msg);
76  // Notify waiting threads that exception occurred
77  self->_snapshot.SetException(std::runtime_error(exception));
78  }
79  /// Sensor case: inconsistent data
80  else {
81  bool episode_changed = (next->GetEpisodeId() != prev->GetEpisodeId());
82 
83  do {
84  if (prev->GetFrame() >= next->GetFrame() && !episode_changed) {
85  self->_on_tick_callbacks.Call(next);
86  return;
87  }
88  } while (!self->_state.compare_exchange(&prev, next));
89 
90  if(UpdateLights || HasMapChanged) {
91  self->_on_light_update_callbacks.Call(next);
92  }
93 
94  if(HasMapChanged) {
95  self->_should_update_map = true;
96  }
97 
98  /// Episode change
99  if(episode_changed) {
100  self->OnEpisodeChanged();
101  }
102 
103  // Notify waiting threads and do the callbacks.
104  self->_snapshot.SetValue(next);
105 
106  // Call user callbacks.
107  self->_on_tick_callbacks.Call(next);
108  }
109  }
110  });
111  }
112 
113  boost::optional<rpc::Actor> Episode::GetActorById(ActorId id) {
114  auto actor = _actors.GetActorById(id);
115  if (!actor.has_value()) {
116  auto actor_list = _client.GetActorsById({id});
117  if (!actor_list.empty()) {
118  actor = std::move(actor_list.front());
119  _actors.Insert(*actor);
120  }
121  }
122  return actor;
123  }
124 
125  std::vector<rpc::Actor> Episode::GetActorsById(const std::vector<ActorId> &actor_ids) {
126  return GetActorsById_Impl(_client, _actors, actor_ids);
127  }
128 
129  std::vector<rpc::Actor> Episode::GetActors() {
130  return GetActorsById_Impl(_client, _actors, GetState()->GetActorIds());
131  }
132 
134  _actors.Clear();
135  _on_tick_callbacks.Clear();
136  _walker_navigation.reset();
138  }
139 
142  }
143 
145  if(_should_update_map) {
146  _should_update_map = false;
147  return true;
148  }
149  return false;
150  }
151 
152  std::shared_ptr<WalkerNavigation> Episode::CreateNavigationIfMissing() {
153  std::shared_ptr<WalkerNavigation> nav;
154  do {
155  nav = _walker_navigation.load();
156  if (nav == nullptr) {
157  auto new_nav = std::make_shared<WalkerNavigation>(_simulator);
158  _walker_navigation.compare_exchange(&nav, new_nav);
159  }
160  } while (nav == nullptr);
161  return nav;
162  }
163 
164 } // namespace detail
165 } // namespace client
166 } // namespace carla
AtomicSharedPtr< const EpisodeState > _state
Definition: Episode.h:114
static SharedPtr< SensorData > Deserialize(Buffer &&buffer)
Holds the current episode, and the current episode state.
Definition: Episode.h:34
CachedActorList _actors
Definition: Episode.h:118
std::vector< rpc::Actor > GetActors()
Definition: Episode.cpp:129
static void log_error(Args &&... args)
Definition: Logging.h:110
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
std::shared_ptr< WalkerNavigation > CreateNavigationIfMissing()
Definition: Episode.cpp:152
Represents the state of all the actors of an episode at a given frame.
Definition: EpisodeState.h:27
void SubscribeToStream(const streaming::Token &token, std::function< void(Buffer)> callback)
Keeps a list of actor descriptions to avoid requesting each time the descriptions to the server...
std::vector< rpc::Actor > GetActorsById(const RangeT &range) const
Retrieve the actors matching the ids in range.
boost::optional< rpc::Actor > GetActorById(ActorId id) const
Retrieve the actor matching id, or empty optional if actor is not cached.
static auto & CastData(const sensor::SensorData &data)
Definition: Episode.cpp:23
void Insert(rpc::Actor actor)
Inserts an actor into the list.
Base class for all the objects containing data generated by a sensor.
Definition: SensorData.h:20
void InsertRange(RangeT range)
Inserts a range containing actors.
const streaming::Token _token
Definition: Episode.h:130
boost::optional< rpc::Actor > GetActorById(ActorId id)
Definition: Episode.cpp:113
carla::ActorId ActorId
std::vector< rpc::Actor > GetActorsById(const std::vector< ActorId > &ids)
std::vector< rpc::Actor > GetActorsById(const std::vector< ActorId > &actor_ids)
Definition: Episode.cpp:125
std::shared_ptr< const EpisodeState > GetState() const
Definition: Episode.h:49
Provides communication with the rpc and streaming servers of a CARLA simulator.
std::vector< ActorId > GetMissingIds(const RangeT &range) const
Return the actor ids present in range that haven&#39;t been added to this list.
State of the episode at a given frame.
Episode(Client &client, std::weak_ptr< Simulator > simulator)
Definition: Episode.cpp:37
void UnSubscribeFromStream(const streaming::Token &token)
CallbackList< WorldSnapshot > _on_tick_callbacks
Definition: Episode.h:120
std::weak_ptr< Simulator > _simulator
Definition: Episode.h:136
static auto GetActorsById_Impl(Client &client, CachedActorList &actors, const RangeT &actor_ids)
Definition: Episode.cpp:29
AtomicSharedPtr< WalkerNavigation > _walker_navigation
Definition: Episode.h:128