CARLA
rpc/Server.h
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #pragma once
8 
9 #include "carla/MoveHandler.h"
10 #include "carla/Time.h"
11 #include "carla/rpc/Metadata.h"
12 #include "carla/rpc/Response.h"
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/asio/post.hpp>
16 
17 #include <rpc/server.h>
18 
19 #include <future>
20 
21 namespace carla {
22 namespace rpc {
23 
24  // ===========================================================================
25  // -- Server -----------------------------------------------------------------
26  // ===========================================================================
27 
28  /// An RPC server in which functions can be bind to run synchronously or
29  /// asynchronously.
30  ///
31  /// Use `AsyncRun` to start the worker threads, and use `SyncRunFor` to
32  /// run a slice of work in the caller's thread.
33  ///
34  /// Functions that are bind using `BindAsync` will run asynchronously in the
35  /// worker threads. Functions that are bind using `BindSync` will run within
36  /// `SyncRunFor` function.
37  class Server {
38  public:
39 
40  template <typename... Args>
41  explicit Server(Args &&... args);
42 
43  template <typename FunctorT>
44  void BindSync(const std::string &name, FunctorT &&functor);
45 
46  template <typename FunctorT>
47  void BindAsync(const std::string &name, FunctorT &&functor);
48 
49  void AsyncRun(size_t worker_threads) {
50  _server.async_run(worker_threads);
51  }
52 
53  void SyncRunFor(time_duration duration) {
54  #ifdef LIBCARLA_INCLUDED_FROM_UE4
56  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
58  #endif // LIBCARLA_INCLUDED_FROM_UE4
59  _sync_io_context.reset();
60  _sync_io_context.run_for(duration.to_chrono());
61  }
62 
63  /// @warning does not stop the game thread.
64  void Stop() {
65  _server.stop();
66  }
67 
68  private:
69 
70  boost::asio::io_context _sync_io_context;
71 
72  ::rpc::server _server;
73  };
74 
75  // ===========================================================================
76  // -- Server implementation --------------------------------------------------
77  // ===========================================================================
78 
79 namespace detail {
80 
81  template <typename T>
82  struct FunctionWrapper : FunctionWrapper<decltype(&T::operator())> {};
83 
84  template <typename C, typename R, typename... Args>
85  struct FunctionWrapper<R (C::*)(Args...)> : FunctionWrapper<R (*)(Args...)> {};
86 
87  template <typename C, typename R, typename... Args>
88  struct FunctionWrapper<R (C::*)(Args...) const> : FunctionWrapper<R (*)(Args...)> {};
89 
90  template<class T>
91  struct FunctionWrapper<T &> : public FunctionWrapper<T> {};
92 
93  template<class T>
94  struct FunctionWrapper<T &&> : public FunctionWrapper<T> {};
95 
96  template <typename R, typename... Args>
97  struct FunctionWrapper<R (*)(Args...)> {
98 
99  /// Wraps @a functor into a function type with equivalent signature. The
100  /// wrap function returned. When called, posts @a functor into the
101  /// io_context; if the client called this method synchronously, waits for
102  /// the posted task to finish, otherwise returns immediately.
103  ///
104  /// This way, no matter from which thread the wrap function is called, the
105  /// @a functor provided is always called from the context of the io_context.
106  /// I.e., we can use the io_context to run tasks on a specific thread (e.g.
107  /// game thread).
108  template <typename FuncT>
109  static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor) {
110  return [&io, functor=std::forward<FuncT>(functor)](Metadata metadata, Args... args) -> R {
111  auto task = std::packaged_task<R()>([functor=std::move(functor), args...]() {
112  return functor(args...);
113  });
114  if (metadata.IsResponseIgnored()) {
115  // Post task and ignore result.
116  boost::asio::post(io, MoveHandler(task));
117  return R();
118  } else {
119  // Post task and wait for result.
120  auto result = task.get_future();
121  boost::asio::post(io, MoveHandler(task));
122  return result.get();
123  }
124  };
125  }
126 
127  /// Wraps @a functor into a function type with equivalent signature that
128  /// handles the metadata sent by the client. If the client called this
129  /// method asynchronously, the result is ignored.
130  template <typename FuncT>
131  static auto WrapAsyncCall(FuncT &&functor) {
132  return [functor=std::forward<FuncT>(functor)](::carla::rpc::Metadata metadata, Args... args) -> R {
133  if (metadata.IsResponseIgnored()) {
134  functor(args...);
135  return R();
136  } else {
137  return functor(args...);
138  }
139  };
140  }
141  };
142 
143 } // namespace detail
144 
145  template <typename ... Args>
146  inline Server::Server(Args && ... args)
147  : _server(std::forward<Args>(args) ...) {
148  _server.suppress_exceptions(true);
149  }
150 
151  template <typename FunctorT>
152  inline void Server::BindSync(const std::string &name, FunctorT &&functor) {
153  using Wrapper = detail::FunctionWrapper<FunctorT>;
154  _server.bind(
155  name,
156  Wrapper::WrapSyncCall(_sync_io_context, std::forward<FunctorT>(functor)));
157  }
158 
159  template <typename FunctorT>
160  inline void Server::BindAsync(const std::string &name, FunctorT &&functor) {
161  using Wrapper = detail::FunctionWrapper<FunctorT>;
162  _server.bind(
163  name,
164  Wrapper::WrapAsyncCall(std::forward<FunctorT>(functor)));
165  }
166 
167 } // namespace rpc
168 } // namespace carla
::rpc::server _server
Definition: rpc/Server.h:72
void AsyncRun(size_t worker_threads)
Definition: rpc/Server.h:49
static auto WrapAsyncCall(FuncT &&functor)
Wraps functor into a function type with equivalent signature that handles the metadata sent by the cl...
Definition: rpc/Server.h:131
Server(Args &&... args)
Definition: rpc/Server.h:146
void BindSync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:152
carla::rpc::Response< T > R
Definition: CarlaServer.cpp:78
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
An RPC server in which functions can be bind to run synchronously or asynchronously.
Definition: rpc/Server.h:37
void BindAsync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:160
bool IsResponseIgnored() const
Definition: Metadata.h:28
boost::asio::io_context _sync_io_context
Definition: rpc/Server.h:70
auto MoveHandler(FunctorT &&func)
Hack to trick asio into accepting move-only handlers, if the handler were actually copied it would re...
Definition: MoveHandler.h:33
static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor)
Wraps functor into a function type with equivalent signature.
Definition: rpc/Server.h:109
Positive time duration up to milliseconds resolution.
Definition: Time.h:19
void SyncRunFor(time_duration duration)
Definition: rpc/Server.h:53
constexpr auto to_chrono() const
Definition: Time.h:50
Metadata of an RPC function call.
Definition: Metadata.h:15