CARLA
CarlaEngine.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "Carla.h"
9 
16 
17 #include "Runtime/Core/Public/Misc/App.h"
18 #include "PhysicsEngine/PhysicsSettings.h"
20 
22 #include <carla/Logging.h>
28 #include <carla/streaming/Server.h>
30 
31 #include <thread>
32 
33 // =============================================================================
34 // -- Static local methods -----------------------------------------------------
35 // =============================================================================
36 
37 // init static frame counter
38 uint64_t FCarlaEngine::FrameCounter = 0;
39 
41 {
42  return std::max(std::thread::hardware_concurrency(), 4u) - 2u;
43 }
44 
45 static TOptional<double> FCarlaEngine_GetFixedDeltaSeconds()
46 {
47  return FApp::IsBenchmarking() ? FApp::GetFixedDeltaTime() : TOptional<double>{};
48 }
49 
50 static void FCarlaEngine_SetFixedDeltaSeconds(TOptional<double> FixedDeltaSeconds)
51 {
52  FApp::SetBenchmarking(FixedDeltaSeconds.IsSet());
53  FApp::SetFixedDeltaTime(FixedDeltaSeconds.Get(0.0));
54 }
55 
56 // =============================================================================
57 // -- FCarlaEngine -------------------------------------------------------------
58 // =============================================================================
59 
61 {
62  if (bIsRunning)
63  {
64  FWorldDelegates::OnWorldTickStart.Remove(OnPreTickHandle);
65  FWorldDelegates::OnWorldPostActorTick.Remove(OnPostTickHandle);
67  }
68 }
69 
71 {
72  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
73  if (!bIsRunning)
74  {
75  const auto StreamingPort = Settings.StreamingPort;
76  const auto SecondaryPort = Settings.SecondaryPort;
77  const auto PrimaryIP = Settings.PrimaryIP;
78  const auto PrimaryPort = Settings.PrimaryPort;
79 
80  auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort, SecondaryPort);
82 
83  WorldObserver.SetStream(BroadcastStream);
84 
85  OnPreTickHandle = FWorldDelegates::OnWorldTickStart.AddRaw(
86  this,
88  OnPostTickHandle = FWorldDelegates::OnWorldPostActorTick.AddRaw(
89  this,
92  this,
94 
95  bIsRunning = true;
96 
97  // check to convert this as secondary server
98  if (!PrimaryIP.empty())
99  {
100  // we are secondary server, connecting to primary server
101  bIsPrimaryServer = false;
102 
103  // define the commands executor (when a command comes from the primary server)
104  auto CommandExecutor = [=](carla::multigpu::MultiGPUCommand Id, carla::Buffer Data) {
105  struct CarlaStreamBuffer : public std::streambuf
106  {
107  CarlaStreamBuffer(char *buf, std::size_t size) { setg(buf, buf, buf + size); }
108  };
109  switch (Id) {
111  {
112  if(GetCurrentEpisode())
113  {
114  TRACE_CPUPROFILER_EVENT_SCOPE_STR("MultiGPUCommand::SEND_FRAME");
115  // convert frame data from buffer to istream
116  CarlaStreamBuffer TempStream((char *) Data.data(), Data.size());
117  std::istream InStream(&TempStream);
118  GetCurrentEpisode()->GetFrameData().Read(InStream);
119  {
120  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.emplace_back");
121  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
122  FramesToProcess.emplace_back(GetCurrentEpisode()->GetFrameData());
123  }
124  }
125  // forces a tick
126  Server.Tick();
127  break;
128  }
130  break;
131 
133  {
134  // get the sensor id
135  auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
136  // query dispatcher
138  carla::Buffer buf(reinterpret_cast<unsigned char *>(&token), (size_t) sizeof(token));
139  carla::log_info("responding with a token for port ", token.get_port());
140  Secondary->Write(std::move(buf));
141  break;
142  }
143 
145  {
146  std::string msg("Yes, I'm alive");
147  carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
148  carla::log_info("responding is alive command");
149  Secondary->Write(std::move(buf));
150  break;
151  }
152  }
153  };
154 
155  Secondary = std::make_shared<carla::multigpu::Secondary>(
156  PrimaryIP,
157  PrimaryPort,
158  CommandExecutor
159  );
160 
161  Secondary->Connect();
162  // set this server in synchronous mode
163  bSynchronousMode = true;
164  }
165  else
166  {
167  // we are primary server, starting server
168  bIsPrimaryServer = true;
170  }
171  }
172 
173  bMapChanged = true;
174 }
175 
177 {
178  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
180  CurrentEpisode = &Episode;
181 
182  // Reset map settings
183  UWorld* World = CurrentEpisode->GetWorld();
184  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
185  if (LargeMapManager)
186  {
189  }
190 
191  if (!bIsPrimaryServer)
192  {
193  // set this secondary server with no-rendering mode
195  }
196 
198 
199  ResetFrameCounter(GFrameNumber);
200 
201  // make connection between Episode and Recorder
202  if (Recorder)
203  {
204  Recorder->SetEpisode(&Episode);
205  Episode.SetRecorder(Recorder);
207  }
208 
209  Server.NotifyBeginEpisode(Episode);
210 
212 }
213 
215 {
217  CurrentEpisode = nullptr;
218 }
219 
220 void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
221 {
222  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
223  if (TickType == ELevelTick::LEVELTICK_All)
224  {
225 
226  if (bIsPrimaryServer)
227  {
228  if (CurrentEpisode && !bSynchronousMode && SecondaryServer->HasClientsConnected())
229  {
230  // set synchronous mode
235  }
236 
237  // process RPC commands
238  do
239  {
240  Server.RunSome(1u);
241  }
243  }
244  else
245  {
246  // process frame data
247  do
248  {
249  Server.RunSome(1u);
250  }
251  while (!FramesToProcess.size());
252  }
253 
254  // update frame counter
256 
257  if (CurrentEpisode != nullptr)
258  {
259  CurrentEpisode->TickTimers(DeltaSeconds);
260  }
262  {
263  if (FramesToProcess.size())
264  {
265  TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
266  std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
267  FramesToProcess.front().PlayFrameData(GetCurrentEpisode(), MappedId);
268  FramesToProcess.erase(FramesToProcess.begin()); // remove first element
269  }
270  }
271  }
272 }
273 
274 
275 void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
276 {
277  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
278  // tick the recorder/replayer system
279  if (GetCurrentEpisode())
280  {
281  if (bIsPrimaryServer)
282  {
283  if (SecondaryServer->HasClientsConnected()) {
285  std::ostringstream OutStream;
286  GetCurrentEpisode()->GetFrameData().Write(OutStream);
287 
288  // send frame data to secondary
289  std::string Tmp(OutStream.str());
290  SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));
291 
293  }
294  }
295 
296  auto* EpisodeRecorder = GetCurrentEpisode()->GetRecorder();
297  if (EpisodeRecorder)
298  {
299  EpisodeRecorder->Ticking(DeltaSeconds);
300  }
301  }
302 
303  if ((TickType == ELevelTick::LEVELTICK_All) && (CurrentEpisode != nullptr))
304  {
305  // Look for lightsubsystem
306  bool LightUpdatePending = false;
307  if (World)
308  {
309  UCarlaLightSubsystem* CarlaLightSubsystem = World->GetSubsystem<UCarlaLightSubsystem>();
310  if (CarlaLightSubsystem)
311  {
312  LightUpdatePending = CarlaLightSubsystem->IsUpdatePending();
313  }
314  }
315 
316  // send the worldsnapshot
317  WorldObserver.BroadcastTick(*CurrentEpisode, DeltaSeconds, bMapChanged, LightUpdatePending);
318  CurrentEpisode->GetSensorManager().PostPhysTick(World, TickType, DeltaSeconds);
320  }
321 }
322 
324 {
325  CurrentSettings = FEpisodeSettings(Settings);
326 
328 
329  if (GEngine && GEngine->GameViewport)
330  {
331  GEngine->GameViewport->bDisableWorldRendering = Settings.bNoRenderingMode;
332  }
333 
335 
336  // Setting parameters for physics substepping
337  UPhysicsSettings* PhysSett = UPhysicsSettings::Get();
338  PhysSett->bSubstepping = Settings.bSubstepping;
339  PhysSett->MaxSubstepDeltaTime = Settings.MaxSubstepDeltaTime;
340  PhysSett->MaxSubsteps = Settings.MaxSubsteps;
341 
342  UWorld* World = CurrentEpisode->GetWorld();
343  ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
344  if (LargeMapManager)
345  {
346  LargeMapManager->SetLayerStreamingDistance(Settings.TileStreamingDistance);
347  LargeMapManager->SetActorStreamingDistance(Settings.ActorActiveDistance);
348  }
349 }
350 
352 {
353  bMapChanged = false;
354 }
void CheckPlayAfterMapLoaded(void)
void PostPhysTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
carla::streaming::Server & GetStreamingServer()
UCarlaEpisode * GetCurrentEpisode()
Definition: CarlaEngine.h:54
bool bMapChanged
Definition: CarlaEngine.h:94
void OnEpisodeSettingsChanged(const FEpisodeSettings &Settings)
uint32 SecondaryPort
setting for the secondary servers port.
Definition: CarlaSettings.h:89
void BroadcastTick(const UCarlaEpisode &Episode, float DeltaSeconds, bool MapChange, bool PendingLightUpdate)
Send a message to every connected client with the info about the given Episode.
FDelegateHandle OnEpisodeSettingsChangeHandle
Definition: CarlaEngine.h:110
FSensorManager & GetSensorManager()
Definition: CarlaEpisode.h:319
carla::streaming::detail::token_type GetToken(carla::streaming::detail::stream_id_type sensor_id)
std::shared_ptr< carla::multigpu::Router > SecondaryServer
Definition: CarlaEngine.h:116
void SetEpisode(UCarlaEpisode *ThisEpisode)
FCarlaServer Server
Definition: CarlaEngine.h:96
static void FCarlaEngine_SetFixedDeltaSeconds(TOptional< double > FixedDeltaSeconds)
Definition: CarlaEngine.cpp:50
std::vector< FFrameData > FramesToProcess
Definition: CarlaEngine.h:119
uint32 StreamingPort
setting for the streaming port.
Definition: CarlaSettings.h:86
static uint32 FCarlaEngine_GetNumberOfThreadsForRPCServer()
Definition: CarlaEngine.cpp:40
FDataMultiStream Start(uint16_t RPCPort, uint16_t StreamingPort, uint16_t SecondaryPort)
static FOnEpisodeSettingsChange OnEpisodeSettingsChange
static uint64_t UpdateFrameCounter()
Definition: CarlaEngine.h:69
FDelegateHandle OnPostTickHandle
Definition: CarlaEngine.h:108
void SetStream(FDataMultiStream InStream)
Replace the Stream associated with this sensor.
Definition: WorldObserver.h:22
FWorldObserver WorldObserver
Definition: CarlaEngine.h:98
CarlaReplayer * GetReplayer(void)
uint32_t stream_id_type
Definition: Types.h:18
static T Get(carla::rpc::Response< T > &response)
void Write(std::ostream &OutStream)
Definition: FrameData.cpp:173
void NotifyEndEpisode()
static void ResetFrameCounter(uint64_t Value=0)
Definition: CarlaEngine.h:75
void NotifyInitGame(const UCarlaSettings &Settings)
Definition: CarlaEngine.cpp:70
void OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
static uint64_t FrameCounter
Definition: CarlaEngine.h:34
bool bIsRunning
Definition: CarlaEngine.h:90
void RunSome(uint32 Milliseconds)
A simulation episode.
Definition: CarlaEpisode.h:37
std::mutex FrameToProcessMutex
Definition: CarlaEngine.h:120
Serializes a stream endpoint.
Definition: detail/Token.h:61
void NotifyBeginEpisode(UCarlaEpisode &Episode)
ACarlaRecorder * Recorder
Definition: CarlaEngine.h:104
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData=false)
Definition: FrameData.cpp:14
void NotifyEndEpisode()
std::shared_ptr< carla::multigpu::Router > GetSecondaryServer()
std::string PrimaryIP
setting for the IP and Port of the primary server to connect.
Definition: CarlaSettings.h:92
std::unordered_map< uint32_t, uint32_t > MappedId
Definition: CarlaEngine.h:114
FEpisodeSettings CurrentSettings
Definition: CarlaEngine.h:102
void SetActorStreamingDistance(float Distance)
Global settings for CARLA.
Definition: CarlaSettings.h:21
UCarlaEpisode * CurrentEpisode
Definition: CarlaEngine.h:100
uint32 PrimaryPort
Definition: CarlaSettings.h:93
void ResetSimulationState()
bool TickCueReceived()
static void log_info(Args &&... args)
Definition: Logging.h:82
bool bIsPrimaryServer
Definition: CarlaEpisode.h:321
ACarlaRecorder * GetRecorder() const
Definition: CarlaEpisode.h:296
void Clear()
Definition: FrameData.cpp:153
void SetLayerStreamingDistance(float Distance)
void Ticking(float DeltaSeconds)
void SetRecorder(ACarlaRecorder *Rec)
Definition: CarlaEpisode.h:301
FFrameData & GetFrameData()
Definition: CarlaEpisode.h:317
void OnPreTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
float GetActorStreamingDistance() const
float GetLayerStreamingDistance() const
static TOptional< double > FCarlaEngine_GetFixedDeltaSeconds()
Definition: CarlaEngine.cpp:45
bool bSynchronousMode
Definition: CarlaEngine.h:92
A piece of raw data.
Definition: carla/Buffer.h:41
static ALargeMapManager * GetLargeMapManager(const UObject *WorldContextObject)
Definition: CarlaStatics.h:100
uint32 RPCPort
World port to listen for client connections.
Definition: CarlaSettings.h:83
bool bIsPrimaryServer
Definition: CarlaEngine.h:112
void Read(std::istream &InStream)
Definition: FrameData.cpp:187
FEpisodeSettings EpisodeSettings
Definition: CarlaEpisode.h:363
std::shared_ptr< carla::multigpu::Secondary > Secondary
Definition: CarlaEngine.h:117
void AsyncRun(uint32 NumberOfWorkerThreads)
FDelegateHandle OnPreTickHandle
Definition: CarlaEngine.h:106
void TickTimers(float DeltaSeconds)
Definition: CarlaEpisode.h:345
void ApplySettings(const FEpisodeSettings &Settings)
void NotifyBeginEpisode(UCarlaEpisode &Episode)