CARLA
CarlaEngine.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "Carla.h"
9 
16 
17 #include "Runtime/Core/Public/Misc/App.h"
18 #include "PhysicsEngine/PhysicsSettings.h"
20 
22 #include <carla/Logging.h>
28 #include <carla/streaming/Server.h>
30 
31 #include <thread>
32 
33 // =============================================================================
34 // -- Static local methods -----------------------------------------------------
35 // =============================================================================
36 
37 // init static frame counter
38 uint64_t FCarlaEngine::FrameCounter = 0;
39 
41 {
42  return std::max(std::thread::hardware_concurrency(), 4u) - 2u;
43 }
44 
45 static TOptional<double> FCarlaEngine_GetFixedDeltaSeconds()
46 {
47  return FApp::IsBenchmarking() ? FApp::GetFixedDeltaTime() : TOptional<double>{};
48 }
49 
50 static void FCarlaEngine_SetFixedDeltaSeconds(TOptional<double> FixedDeltaSeconds)
51 {
52  FApp::SetBenchmarking(FixedDeltaSeconds.IsSet());
53  FApp::SetFixedDeltaTime(FixedDeltaSeconds.Get(0.0));
54 }
55 
56 // =============================================================================
57 // -- FCarlaEngine -------------------------------------------------------------
58 // =============================================================================
59 
61 {
62  if (bIsRunning)
63  {
64  FWorldDelegates::OnWorldTickStart.Remove(OnPreTickHandle);
65  FWorldDelegates::OnWorldPostActorTick.Remove(OnPostTickHandle);
67  }
68 }
69 
71 {
72  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
73  if (!bIsRunning)
74  {
75  const auto StreamingPort = Settings.StreamingPort;
76  const auto SecondaryPort = Settings.SecondaryPort;
77  const auto PrimaryIP = Settings.PrimaryIP;
78  const auto PrimaryPort = Settings.PrimaryPort;
79 
80  auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort, SecondaryPort);
82 
83  WorldObserver.SetStream(BroadcastStream);
84 
85  OnPreTickHandle = FWorldDelegates::OnWorldTickStart.AddRaw(
86  this,
88  OnPostTickHandle = FWorldDelegates::OnWorldPostActorTick.AddRaw(
89  this,
92  this,
94 
95  bIsRunning = true;
96 
97  // check to convert this as secondary server
98  if (!PrimaryIP.empty())
99  {
100  // we are secondary server, connecting to primary server
101  bIsPrimaryServer = false;
102 
103  // define the commands executor (when a command comes from the primary server)
104  auto CommandExecutor = [=](carla::multigpu::MultiGPUCommand Id, carla::Buffer Data) {
105  struct CarlaStreamBuffer : public std::streambuf
106  {
107  CarlaStreamBuffer(char *buf, std::size_t size) { setg(buf, buf, buf + size); }
108  };
109  switch (Id) {
111  {
112  if(GetCurrentEpisode())
113  {
114  TRACE_CPUPROFILER_EVENT_SCOPE_STR("MultiGPUCommand::SEND_FRAME");
115  // convert frame data from buffer to istream
116  CarlaStreamBuffer TempStream((char *) Data.data(), Data.size());
117  std::istream InStream(&TempStream);
118  GetCurrentEpisode()->GetFrameData().Read(InStream);
119  {
120  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.emplace_back");
121  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
122  FramesToProcess.emplace_back(GetCurrentEpisode()->GetFrameData());
123  }
124  }
125  // forces a tick
126  Server.Tick();
127  break;
128  }
130  {
131  FString FinalPath((char *) Data.data());
132  UGameplayStatics::OpenLevel(CurrentEpisode->GetWorld(), *FinalPath, true);
133  break;
134  }
136  {
137  // get the sensor id
138  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
139  // query dispatcher
141  carla::Buffer buf(reinterpret_cast<unsigned char *>(&token), (size_t) sizeof(token));
142  carla::log_info("responding with a token for port ", token.get_port());
143  Secondary->Write(std::move(buf));
144  break;
145  }
146 
148  {
149  std::string msg("Yes, I'm alive");
150  carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
151  carla::log_info("responding is alive command");
152  Secondary->Write(std::move(buf));
153  break;
154  }
155  }
156  };
157 
158  Secondary = std::make_shared<carla::multigpu::Secondary>(
159  PrimaryIP,
160  PrimaryPort,
161  CommandExecutor
162  );
163 
164  Secondary->Connect();
165  // set this server in synchronous mode
166  bSynchronousMode = true;
167  }
168  else
169  {
170  // we are primary server, starting server
171  bIsPrimaryServer = true;
173  SecondaryServer->SetNewConnectionCallback([this]()
174  {
175  this->bNewConnection = true;
176  UE_LOG(LogCarla, Log, TEXT("New secondary connection detected"));
177  });
178  }
179  }
180 
181  bMapChanged = true;
182 }
183 
185 {
186  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
188  CurrentEpisode = &Episode;
189 
190  // Reset map settings
191  UWorld* World = CurrentEpisode->GetWorld();
192  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
193  if (LargeMapManager)
194  {
197  }
198 
199  if (!bIsPrimaryServer)
200  {
201  // set this secondary server with no-rendering mode
203  }
204 
206 
207  ResetFrameCounter(GFrameNumber);
208 
209  // make connection between Episode and Recorder
210  if (Recorder)
211  {
212  Recorder->SetEpisode(&Episode);
213  Episode.SetRecorder(Recorder);
215  }
216 
217  Server.NotifyBeginEpisode(Episode);
218 
220 }
221 
223 {
225  CurrentEpisode = nullptr;
226 }
227 
228 void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
229 {
230  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
231  if (TickType == ELevelTick::LEVELTICK_All)
232  {
233 
234  if (bIsPrimaryServer)
235  {
236  if (CurrentEpisode && !bSynchronousMode && SecondaryServer->HasClientsConnected())
237  {
238  // set synchronous mode
243  }
244 
245  // process RPC commands
246  do
247  {
248  Server.RunSome(1u);
249  }
251  }
252  else
253  {
254  // process frame data
255  do
256  {
257  Server.RunSome(1u);
258  }
259  while (!FramesToProcess.size());
260  }
261 
262  // update frame counter
264 
265  if (CurrentEpisode)
266  {
267  CurrentEpisode->TickTimers(DeltaSeconds);
268 
269  if (!bIsPrimaryServer)
270  {
271  if (FramesToProcess.size())
272  {
273  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
274  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
275  FramesToProcess.front().PlayFrameData(CurrentEpisode, MappedId);
276  FramesToProcess.erase(FramesToProcess.begin()); // remove first element
277  }
278  }
279  }
280  }
281 }
282 
283 
284 void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
285 {
286  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
287  // tick the recorder/replayer system
288  if (GetCurrentEpisode())
289  {
290  if (bIsPrimaryServer)
291  {
292  if (SecondaryServer->HasClientsConnected()) {
294  bNewConnection = false;
295  std::ostringstream OutStream;
296  GetCurrentEpisode()->GetFrameData().Write(OutStream);
297 
298  // send frame data to secondary
299  std::string Tmp(OutStream.str());
300  SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));
301 
303  }
304  }
305 
306  auto* EpisodeRecorder = GetCurrentEpisode()->GetRecorder();
307  if (EpisodeRecorder)
308  {
309  EpisodeRecorder->Ticking(DeltaSeconds);
310  }
311  }
312 
313  if ((TickType == ELevelTick::LEVELTICK_All) && (CurrentEpisode != nullptr))
314  {
315  // Look for lightsubsystem
316  bool LightUpdatePending = false;
317  if (World)
318  {
319  UCarlaLightSubsystem* CarlaLightSubsystem = World->GetSubsystem<UCarlaLightSubsystem>();
320  if (CarlaLightSubsystem)
321  {
322  LightUpdatePending = CarlaLightSubsystem->IsUpdatePending();
323  }
324  }
325 
326  // send the worldsnapshot
327  WorldObserver.BroadcastTick(*CurrentEpisode, DeltaSeconds, bMapChanged, LightUpdatePending);
328  CurrentEpisode->GetSensorManager().PostPhysTick(World, TickType, DeltaSeconds);
330  }
331 }
332 
334 {
335  CurrentSettings = FEpisodeSettings(Settings);
336 
338 
339  if (GEngine && GEngine->GameViewport)
340  {
341  GEngine->GameViewport->bDisableWorldRendering = Settings.bNoRenderingMode;
342  }
343 
345 
346  // Setting parameters for physics substepping
347  UPhysicsSettings* PhysSett = UPhysicsSettings::Get();
348  PhysSett->bSubstepping = Settings.bSubstepping;
349  PhysSett->MaxSubstepDeltaTime = Settings.MaxSubstepDeltaTime;
350  PhysSett->MaxSubsteps = Settings.MaxSubsteps;
351 
352  UWorld* World = CurrentEpisode->GetWorld();
353  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
354  if (LargeMapManager)
355  {
356  LargeMapManager->SetLayerStreamingDistance(Settings.TileStreamingDistance);
357  LargeMapManager->SetActorStreamingDistance(Settings.ActorActiveDistance);
358  }
359 }
360 
362 {
363  bMapChanged = false;
364 }
void CheckPlayAfterMapLoaded(void)
void PostPhysTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
carla::streaming::Server & GetStreamingServer()
UCarlaEpisode * GetCurrentEpisode()
Definition: CarlaEngine.h:54
bool bMapChanged
Definition: CarlaEngine.h:99
void OnEpisodeSettingsChanged(const FEpisodeSettings &Settings)
uint32 SecondaryPort
setting for the secondary servers port.
Definition: CarlaSettings.h:89
void BroadcastTick(const UCarlaEpisode &Episode, float DeltaSeconds, bool MapChange, bool PendingLightUpdate)
Send a message to every connected client with the info about the given Episode.
FDelegateHandle OnEpisodeSettingsChangeHandle
Definition: CarlaEngine.h:115
FSensorManager & GetSensorManager()
Definition: CarlaEpisode.h:327
carla::streaming::detail::token_type GetToken(carla::streaming::detail::stream_id_type sensor_id)
std::shared_ptr< carla::multigpu::Router > SecondaryServer
Definition: CarlaEngine.h:122
void SetEpisode(UCarlaEpisode *ThisEpisode)
FCarlaServer Server
Definition: CarlaEngine.h:101
static void FCarlaEngine_SetFixedDeltaSeconds(TOptional< double > FixedDeltaSeconds)
Definition: CarlaEngine.cpp:50
std::vector< FFrameData > FramesToProcess
Definition: CarlaEngine.h:125
uint32 StreamingPort
setting for the streaming port.
Definition: CarlaSettings.h:86
static uint32 FCarlaEngine_GetNumberOfThreadsForRPCServer()
Definition: CarlaEngine.cpp:40
FDataMultiStream Start(uint16_t RPCPort, uint16_t StreamingPort, uint16_t SecondaryPort)
static FOnEpisodeSettingsChange OnEpisodeSettingsChange
static uint64_t UpdateFrameCounter()
Definition: CarlaEngine.h:69
FDelegateHandle OnPostTickHandle
Definition: CarlaEngine.h:113
void SetStream(FDataMultiStream InStream)
Replace the Stream associated with this sensor.
Definition: WorldObserver.h:22
FWorldObserver WorldObserver
Definition: CarlaEngine.h:103
CarlaReplayer * GetReplayer(void)
uint32_t stream_id_type
Definition: Types.h:18
static T Get(carla::rpc::Response< T > &response)
void Write(std::ostream &OutStream)
Definition: FrameData.cpp:198
void NotifyEndEpisode()
static void ResetFrameCounter(uint64_t Value=0)
Definition: CarlaEngine.h:75
void NotifyInitGame(const UCarlaSettings &Settings)
Definition: CarlaEngine.cpp:70
void OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
static uint64_t FrameCounter
Definition: CarlaEngine.h:34
bool bIsRunning
Definition: CarlaEngine.h:95
void RunSome(uint32 Milliseconds)
A simulation episode.
Definition: CarlaEpisode.h:37
std::mutex FrameToProcessMutex
Definition: CarlaEngine.h:126
Serializes a stream endpoint.
Definition: detail/Token.h:61
void NotifyBeginEpisode(UCarlaEpisode &Episode)
ACarlaRecorder * Recorder
Definition: CarlaEngine.h:109
bool bNewConnection
Definition: CarlaEngine.h:118
void NotifyEndEpisode()
std::shared_ptr< carla::multigpu::Router > GetSecondaryServer()
std::string PrimaryIP
setting for the IP and Port of the primary server to connect.
Definition: CarlaSettings.h:92
std::unordered_map< uint32_t, uint32_t > MappedId
Definition: CarlaEngine.h:120
FEpisodeSettings CurrentSettings
Definition: CarlaEngine.h:107
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData=false, bool bIncludeActorsAgain=false)
Definition: FrameData.cpp:14
void SetActorStreamingDistance(float Distance)
Global settings for CARLA.
Definition: CarlaSettings.h:21
UCarlaEpisode * CurrentEpisode
Definition: CarlaEngine.h:105
uint32 PrimaryPort
Definition: CarlaSettings.h:93
void ResetSimulationState()
bool TickCueReceived()
static void log_info(Args &&... args)
Definition: Logging.h:82
bool bIsPrimaryServer
Definition: CarlaEpisode.h:329
ACarlaRecorder * GetRecorder() const
Definition: CarlaEpisode.h:304
void Clear()
Definition: FrameData.cpp:176
void SetLayerStreamingDistance(float Distance)
void Ticking(float DeltaSeconds)
void SetRecorder(ACarlaRecorder *Rec)
Definition: CarlaEpisode.h:309
FFrameData & GetFrameData()
Definition: CarlaEpisode.h:325
void OnPreTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
float GetActorStreamingDistance() const
float GetLayerStreamingDistance() const
static TOptional< double > FCarlaEngine_GetFixedDeltaSeconds()
Definition: CarlaEngine.cpp:45
bool bSynchronousMode
Definition: CarlaEngine.h:97
A piece of raw data.
Definition: carla/Buffer.h:41
static ALargeMapManager * GetLargeMapManager(const UObject *WorldContextObject)
Definition: CarlaStatics.h:100
uint32 RPCPort
World port to listen for client connections.
Definition: CarlaSettings.h:83
bool bIsPrimaryServer
Definition: CarlaEngine.h:117
void Read(std::istream &InStream)
Definition: FrameData.cpp:215
FEpisodeSettings EpisodeSettings
Definition: CarlaEpisode.h:371
std::shared_ptr< carla::multigpu::Secondary > Secondary
Definition: CarlaEngine.h:123
void AsyncRun(uint32 NumberOfWorkerThreads)
FDelegateHandle OnPreTickHandle
Definition: CarlaEngine.h:111
void TickTimers(float DeltaSeconds)
Definition: CarlaEpisode.h:353
void ApplySettings(const FEpisodeSettings &Settings)
void NotifyBeginEpisode(UCarlaEpisode &Episode)