CARLA
CarlaEngine.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "Carla.h"
9 
17 
18 #include "Runtime/Core/Public/Misc/App.h"
19 #include "PhysicsEngine/PhysicsSettings.h"
21 
23 #include <carla/Logging.h>
28 #include <carla/ros2/ROS2.h>
30 #include <carla/streaming/Server.h>
32 
33 #include <thread>
34 
35 // =============================================================================
36 // -- Static local methods -----------------------------------------------------
37 // =============================================================================
38 
39 // init static variables
40 uint64_t FCarlaEngine::FrameCounter = 0;
41 
43 {
44  return std::max(std::thread::hardware_concurrency(), 4u) - 2u;
45 }
46 
47 static TOptional<double> FCarlaEngine_GetFixedDeltaSeconds()
48 {
49  return FApp::IsBenchmarking() ? FApp::GetFixedDeltaTime() : TOptional<double>{};
50 }
51 
52 static void FCarlaEngine_SetFixedDeltaSeconds(TOptional<double> FixedDeltaSeconds)
53 {
54  FApp::SetBenchmarking(FixedDeltaSeconds.IsSet());
55  FApp::SetFixedDeltaTime(FixedDeltaSeconds.Get(0.0));
56 }
57 
58 // =============================================================================
59 // -- FCarlaEngine -------------------------------------------------------------
60 // =============================================================================
61 
63 {
64  if (bIsRunning)
65  {
66  #if defined(WITH_ROS2)
67  auto ROS2 = carla::ros2::ROS2::GetInstance();
68  if (ROS2->IsEnabled())
69  ROS2->Shutdown();
70  #endif
71  FWorldDelegates::OnWorldTickStart.Remove(OnPreTickHandle);
72  FWorldDelegates::OnWorldPostActorTick.Remove(OnPostTickHandle);
74  }
75 }
76 
78 {
79  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
80  if (!bIsRunning)
81  {
82  const auto StreamingPort = Settings.StreamingPort;
83  const auto SecondaryPort = Settings.SecondaryPort;
84  const auto PrimaryIP = Settings.PrimaryIP;
85  const auto PrimaryPort = Settings.PrimaryPort;
86 
87  auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort, SecondaryPort);
89 
90  WorldObserver.SetStream(BroadcastStream);
91 
92  OnPreTickHandle = FWorldDelegates::OnWorldTickStart.AddRaw(
93  this,
95  OnPostTickHandle = FWorldDelegates::OnWorldPostActorTick.AddRaw(
96  this,
99  this,
101 
102  bIsRunning = true;
103 
104  // check to convert this as secondary server
105  if (!PrimaryIP.empty())
106  {
107  // we are secondary server, connecting to primary server
108  bIsPrimaryServer = false;
109 
110  // define the commands executor (when a command comes from the primary server)
111  auto CommandExecutor = [=](carla::multigpu::MultiGPUCommand Id, carla::Buffer Data) {
112  struct CarlaStreamBuffer : public std::streambuf
113  {
114  CarlaStreamBuffer(char *buf, std::size_t size) { setg(buf, buf, buf + size); }
115  };
116  switch (Id) {
118  {
119  if(GetCurrentEpisode())
120  {
121  TRACE_CPUPROFILER_EVENT_SCOPE_STR("MultiGPUCommand::SEND_FRAME");
122  // convert frame data from buffer to istream
123  CarlaStreamBuffer TempStream((char *) Data.data(), Data.size());
124  std::istream InStream(&TempStream);
125  GetCurrentEpisode()->GetFrameData().Read(InStream);
126  {
127  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.emplace_back");
128  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
129  FramesToProcess.emplace_back(GetCurrentEpisode()->GetFrameData());
130  }
131  }
132  // forces a tick
133  Server.Tick();
134  break;
135  }
137  {
138  FString FinalPath((char *) Data.data());
139  UGameplayStatics::OpenLevel(CurrentEpisode->GetWorld(), *FinalPath, true);
140  break;
141  }
143  {
144  // get the sensor id
145  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
146  // query dispatcher
148  carla::Buffer buf(reinterpret_cast<unsigned char *>(&token), (size_t) sizeof(token));
149  carla::log_info("responding with a token for port ", token.get_port());
150  Secondary->Write(std::move(buf));
151  break;
152  }
154  {
155  std::string msg("Yes, I'm alive");
156  carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
157  carla::log_info("responding is alive command");
158  Secondary->Write(std::move(buf));
159  break;
160  }
162  {
163  // get the sensor id
164  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
165  // query dispatcher
167  // return a 'true'
168  bool res = true;
169  carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
170  carla::log_info("responding ENABLE_ROS with a true");
171  Secondary->Write(std::move(buf));
172  break;
173  }
175  {
176  // get the sensor id
177  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
178  // query dispatcher
180  // return a 'true'
181  bool res = true;
182  carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
183  carla::log_info("responding DISABLE_ROS with a true");
184  Secondary->Write(std::move(buf));
185  break;
186  }
188  {
189  // get the sensor id
190  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
191  // query dispatcher
192  bool res = Server.GetStreamingServer().IsEnabledForROS(sensor_id);
193  carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
194  carla::log_info("responding IS_ENABLED_ROS with: ", res);
195  Secondary->Write(std::move(buf));
196  break;
197  }
198  }
199  };
200 
201  Secondary = std::make_shared<carla::multigpu::Secondary>(PrimaryIP, PrimaryPort, CommandExecutor);
202  Secondary->Connect();
203  // set this server in synchronous mode
204  bSynchronousMode = true;
205  }
206  else
207  {
208  // we are primary server, starting server
209  bIsPrimaryServer = true;
211  SecondaryServer->SetNewConnectionCallback([this]()
212  {
213  this->bNewConnection = true;
214  UE_LOG(LogCarla, Log, TEXT("New secondary connection detected"));
215  });
216  }
217  }
218 
219  // create ROS2 manager
220  #if defined(WITH_ROS2)
221  if (Settings.ROS2)
222  {
223  auto ROS2 = carla::ros2::ROS2::GetInstance();
224  ROS2->Enable(true);
225  }
226  #endif
227 
228  bMapChanged = true;
229 }
230 
232 {
233  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
235  CurrentEpisode = &Episode;
236 
237  // Reset map settings
238  UWorld* World = CurrentEpisode->GetWorld();
239  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
240  if (LargeMapManager)
241  {
244  }
245 
246  if (!bIsPrimaryServer)
247  {
248  // set this secondary server with no-rendering mode
250  }
251 
253 
254  ResetFrameCounter(GFrameNumber);
255 
256  // make connection between Episode and Recorder
257  if (Recorder)
258  {
259  Recorder->SetEpisode(&Episode);
260  Episode.SetRecorder(Recorder);
262  }
263 
264  Server.NotifyBeginEpisode(Episode);
265 
267 }
268 
270 {
272  CurrentEpisode = nullptr;
273 }
274 
275 void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
276 {
277  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
278  if (TickType == ELevelTick::LEVELTICK_All)
279  {
280 
281  if (bIsPrimaryServer)
282  {
283  if (CurrentEpisode && !bSynchronousMode && SecondaryServer->HasClientsConnected())
284  {
285  // set synchronous mode
290  }
291 
292  // process RPC commands
293  do
294  {
295  Server.RunSome(1u);
296  }
298  }
299  else
300  {
301  // process frame data
302  do
303  {
304  Server.RunSome(1u);
305  }
306  while (!FramesToProcess.size());
307  }
308 
309  // update frame counter
311 
312  if (CurrentEpisode)
313  {
314  CurrentEpisode->TickTimers(DeltaSeconds);
315 
316  if (!bIsPrimaryServer)
317  {
318  if (FramesToProcess.size())
319  {
320  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
321  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
322  FramesToProcess.front().PlayFrameData(CurrentEpisode, MappedId);
323  FramesToProcess.erase(FramesToProcess.begin()); // remove first element
324  }
325  }
326  }
327  }
328 }
329 
330 
331 void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
332 {
333  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
334  // tick the recorder/replayer system
335  if (GetCurrentEpisode())
336  {
337  if (bIsPrimaryServer)
338  {
339  if (SecondaryServer->HasClientsConnected()) {
341  bNewConnection = false;
342  std::ostringstream OutStream;
343  GetCurrentEpisode()->GetFrameData().Write(OutStream);
344 
345  // send frame data to secondary
346  std::string Tmp(OutStream.str());
347  SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));
348 
350  }
351  }
352 
353  auto* EpisodeRecorder = GetCurrentEpisode()->GetRecorder();
354  if (EpisodeRecorder)
355  {
356  EpisodeRecorder->Ticking(DeltaSeconds);
357  }
358  }
359 
360  if ((TickType == ELevelTick::LEVELTICK_All) && (CurrentEpisode != nullptr))
361  {
362  // Look for lightsubsystem
363  bool LightUpdatePending = false;
364  if (World)
365  {
366  UCarlaLightSubsystem* CarlaLightSubsystem = World->GetSubsystem<UCarlaLightSubsystem>();
367  if (CarlaLightSubsystem)
368  {
369  LightUpdatePending = CarlaLightSubsystem->IsUpdatePending();
370  }
371  }
372 
373  // send the worldsnapshot
374  WorldObserver.BroadcastTick(*CurrentEpisode, DeltaSeconds, bMapChanged, LightUpdatePending);
375  CurrentEpisode->GetSensorManager().PostPhysTick(World, TickType, DeltaSeconds);
377  }
378 }
379 
381 {
382  CurrentSettings = FEpisodeSettings(Settings);
383 
385 
386  if (GEngine && GEngine->GameViewport)
387  {
388  GEngine->GameViewport->bDisableWorldRendering = Settings.bNoRenderingMode;
389  }
390 
392 
393  // Setting parameters for physics substepping
394  UPhysicsSettings* PhysSett = UPhysicsSettings::Get();
395  PhysSett->bSubstepping = Settings.bSubstepping;
396  PhysSett->MaxSubstepDeltaTime = Settings.MaxSubstepDeltaTime;
397  PhysSett->MaxSubsteps = Settings.MaxSubsteps;
398 
399  UWorld* World = CurrentEpisode->GetWorld();
400  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
401  if (LargeMapManager)
402  {
403  LargeMapManager->SetLayerStreamingDistance(Settings.TileStreamingDistance);
404  LargeMapManager->SetActorStreamingDistance(Settings.ActorActiveDistance);
405  }
406 }
407 
409 {
410  bMapChanged = false;
411 }
void CheckPlayAfterMapLoaded(void)
void PostPhysTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
carla::streaming::Server & GetStreamingServer()
UCarlaEpisode * GetCurrentEpisode()
Definition: CarlaEngine.h:55
void DisableForROS(stream_id sensor_id)
bool bMapChanged
Definition: CarlaEngine.h:110
void OnEpisodeSettingsChanged(const FEpisodeSettings &Settings)
uint32 SecondaryPort
setting for the secondary servers port.
Definition: CarlaSettings.h:89
void BroadcastTick(const UCarlaEpisode &Episode, float DeltaSeconds, bool MapChange, bool PendingLightUpdate)
Send a message to every connected client with the info about the given Episode.
FDelegateHandle OnEpisodeSettingsChangeHandle
Definition: CarlaEngine.h:126
FSensorManager & GetSensorManager()
Definition: CarlaEpisode.h:328
std::shared_ptr< carla::multigpu::Router > SecondaryServer
Definition: CarlaEngine.h:133
void SetEpisode(UCarlaEpisode *ThisEpisode)
FCarlaServer Server
Definition: CarlaEngine.h:112
static void FCarlaEngine_SetFixedDeltaSeconds(TOptional< double > FixedDeltaSeconds)
Definition: CarlaEngine.cpp:52
std::vector< FFrameData > FramesToProcess
Definition: CarlaEngine.h:136
uint32 StreamingPort
setting for the streaming port.
Definition: CarlaSettings.h:86
static uint32 FCarlaEngine_GetNumberOfThreadsForRPCServer()
Definition: CarlaEngine.cpp:42
FDataMultiStream Start(uint16_t RPCPort, uint16_t StreamingPort, uint16_t SecondaryPort)
static FOnEpisodeSettingsChange OnEpisodeSettingsChange
static uint64_t UpdateFrameCounter()
Definition: CarlaEngine.h:70
FDelegateHandle OnPostTickHandle
Definition: CarlaEngine.h:124
void SetStream(FDataMultiStream InStream)
Replace the Stream associated with this sensor.
Definition: WorldObserver.h:22
FWorldObserver WorldObserver
Definition: CarlaEngine.h:114
bool IsEnabledForROS(stream_id sensor_id)
CarlaReplayer * GetReplayer(void)
uint32_t stream_id_type
Definition: Types.h:18
static T Get(carla::rpc::Response< T > &response)
void Write(std::ostream &OutStream)
Definition: FrameData.cpp:207
void NotifyEndEpisode()
static void ResetFrameCounter(uint64_t Value=0)
Definition: CarlaEngine.h:81
void NotifyInitGame(const UCarlaSettings &Settings)
Definition: CarlaEngine.cpp:77
void OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
token_type GetToken(stream_id sensor_id)
static uint64_t FrameCounter
Definition: CarlaEngine.h:35
void RunSome(uint32 Milliseconds)
void EnableForROS(stream_id sensor_id)
A simulation episode.
Definition: CarlaEpisode.h:38
std::mutex FrameToProcessMutex
Definition: CarlaEngine.h:137
Serializes a stream endpoint.
Definition: detail/Token.h:61
void NotifyBeginEpisode(UCarlaEpisode &Episode)
ACarlaRecorder * Recorder
Definition: CarlaEngine.h:120
bool bNewConnection
Definition: CarlaEngine.h:129
void NotifyEndEpisode()
std::shared_ptr< carla::multigpu::Router > GetSecondaryServer()
std::string PrimaryIP
setting for the IP and Port of the primary server to connect.
Definition: CarlaSettings.h:92
std::unordered_map< uint32_t, uint32_t > MappedId
Definition: CarlaEngine.h:131
FEpisodeSettings CurrentSettings
Definition: CarlaEngine.h:118
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData=false, bool bIncludeActorsAgain=false)
Definition: FrameData.cpp:23
void SetActorStreamingDistance(float Distance)
Global settings for CARLA.
Definition: CarlaSettings.h:21
UCarlaEpisode * CurrentEpisode
Definition: CarlaEngine.h:116
uint32 PrimaryPort
Definition: CarlaSettings.h:93
void ResetSimulationState()
bool TickCueReceived()
static void log_info(Args &&... args)
Definition: Logging.h:82
bool bIsPrimaryServer
Definition: CarlaEpisode.h:330
ACarlaRecorder * GetRecorder() const
Definition: CarlaEpisode.h:305
void Clear()
Definition: FrameData.cpp:185
void SetLayerStreamingDistance(float Distance)
void Ticking(float DeltaSeconds)
void SetRecorder(ACarlaRecorder *Rec)
Definition: CarlaEpisode.h:310
FFrameData & GetFrameData()
Definition: CarlaEpisode.h:326
void OnPreTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
float GetActorStreamingDistance() const
float GetLayerStreamingDistance() const
static TOptional< double > FCarlaEngine_GetFixedDeltaSeconds()
Definition: CarlaEngine.cpp:47
bool bSynchronousMode
Definition: CarlaEngine.h:108
A piece of raw data.
Definition: carla/Buffer.h:42
static ALargeMapManager * GetLargeMapManager(const UObject *WorldContextObject)
Definition: CarlaStatics.h:100
uint32 RPCPort
World port to listen for client connections.
Definition: CarlaSettings.h:83
bool bIsPrimaryServer
Definition: CarlaEngine.h:128
void Read(std::istream &InStream)
Definition: FrameData.cpp:224
FEpisodeSettings EpisodeSettings
Definition: CarlaEpisode.h:382
static std::shared_ptr< ROS2 > GetInstance()
Definition: ROS2.h:51
std::shared_ptr< carla::multigpu::Secondary > Secondary
Definition: CarlaEngine.h:134
void AsyncRun(uint32 NumberOfWorkerThreads)
FDelegateHandle OnPreTickHandle
Definition: CarlaEngine.h:122
void TickTimers(float DeltaSeconds)
Definition: CarlaEpisode.h:358
void ApplySettings(const FEpisodeSettings &Settings)
void NotifyBeginEpisode(UCarlaEpisode &Episode)