CARLA
rpc/Server.h
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #pragma once
8 
9 #include "carla/MoveHandler.h"
10 #include "carla/Time.h"
11 #include "carla/rpc/Metadata.h"
12 #include "carla/rpc/Response.h"
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/asio/post.hpp>
16 
17 #include <rpc/server.h>
18 
19 #include <future>
20 
21 namespace carla {
22 namespace rpc {
23 
24  // ===========================================================================
25  // -- Server -----------------------------------------------------------------
26  // ===========================================================================
27 
28  /// An RPC server in which functions can be bind to run synchronously or
29  /// asynchronously.
30  ///
31  /// Use `AsyncRun` to start the worker threads, and use `SyncRunFor` to
32  /// run a slice of work in the caller's thread.
33  ///
34  /// Functions that are bind using `BindAsync` will run asynchronously in the
35  /// worker threads. Functions that are bind using `BindSync` will run within
36  /// `SyncRunFor` function.
37  class Server {
38  public:
39 
40  template <typename... Args>
41  explicit Server(Args &&... args);
42 
43  template <typename FunctorT>
44  void BindSync(const std::string &name, FunctorT &&functor);
45 
46  template <typename FunctorT>
47  void BindAsync(const std::string &name, FunctorT &&functor);
48 
49  void AsyncRun(size_t worker_threads) {
50  _server.async_run(worker_threads);
51  }
52 
53  void SyncRunFor(time_duration duration) {
54  #ifdef LIBCARLA_INCLUDED_FROM_UE4
55  TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
56  #endif // LIBCARLA_INCLUDED_FROM_UE4
57  _sync_io_context.reset();
58  _sync_io_context.run_for(duration.to_chrono());
59  }
60 
61  /// @warning does not stop the game thread.
62  void Stop() {
63  _server.stop();
64  }
65 
66  private:
67 
68  boost::asio::io_context _sync_io_context;
69 
70  ::rpc::server _server;
71  };
72 
73  // ===========================================================================
74  // -- Server implementation --------------------------------------------------
75  // ===========================================================================
76 
77 namespace detail {
78 
79  template <typename T>
80  struct FunctionWrapper : FunctionWrapper<decltype(&T::operator())> {};
81 
82  template <typename C, typename R, typename... Args>
83  struct FunctionWrapper<R (C::*)(Args...)> : FunctionWrapper<R (*)(Args...)> {};
84 
85  template <typename C, typename R, typename... Args>
86  struct FunctionWrapper<R (C::*)(Args...) const> : FunctionWrapper<R (*)(Args...)> {};
87 
88  template<class T>
89  struct FunctionWrapper<T &> : public FunctionWrapper<T> {};
90 
91  template<class T>
92  struct FunctionWrapper<T &&> : public FunctionWrapper<T> {};
93 
94  template <typename R, typename... Args>
95  struct FunctionWrapper<R (*)(Args...)> {
96 
97  /// Wraps @a functor into a function type with equivalent signature. The
98  /// wrap function returned. When called, posts @a functor into the
99  /// io_context; if the client called this method synchronously, waits for
100  /// the posted task to finish, otherwise returns immediately.
101  ///
102  /// This way, no matter from which thread the wrap function is called, the
103  /// @a functor provided is always called from the context of the io_context.
104  /// I.e., we can use the io_context to run tasks on a specific thread (e.g.
105  /// game thread).
106  template <typename FuncT>
107  static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor) {
108  return [&io, functor=std::forward<FuncT>(functor)](Metadata metadata, Args... args) -> R {
109  auto task = std::packaged_task<R()>([functor=std::move(functor), args...]() {
110  return functor(args...);
111  });
112  if (metadata.IsResponseIgnored()) {
113  // Post task and ignore result.
114  boost::asio::post(io, MoveHandler(task));
115  return R();
116  } else {
117  // Post task and wait for result.
118  auto result = task.get_future();
119  boost::asio::post(io, MoveHandler(task));
120  return result.get();
121  }
122  };
123  }
124 
125  /// Wraps @a functor into a function type with equivalent signature that
126  /// handles the metadata sent by the client. If the client called this
127  /// method asynchronously, the result is ignored.
128  template <typename FuncT>
129  static auto WrapAsyncCall(FuncT &&functor) {
130  return [functor=std::forward<FuncT>(functor)](::carla::rpc::Metadata metadata, Args... args) -> R {
131  if (metadata.IsResponseIgnored()) {
132  functor(args...);
133  return R();
134  } else {
135  return functor(args...);
136  }
137  };
138  }
139  };
140 
141 } // namespace detail
142 
143  template <typename ... Args>
144  inline Server::Server(Args && ... args)
145  : _server(std::forward<Args>(args) ...) {
146  _server.suppress_exceptions(true);
147  }
148 
149  template <typename FunctorT>
150  inline void Server::BindSync(const std::string &name, FunctorT &&functor) {
151  using Wrapper = detail::FunctionWrapper<FunctorT>;
152  _server.bind(
153  name,
154  Wrapper::WrapSyncCall(_sync_io_context, std::forward<FunctorT>(functor)));
155  }
156 
157  template <typename FunctorT>
158  inline void Server::BindAsync(const std::string &name, FunctorT &&functor) {
159  using Wrapper = detail::FunctionWrapper<FunctorT>;
160  _server.bind(
161  name,
162  Wrapper::WrapAsyncCall(std::forward<FunctorT>(functor)));
163  }
164 
165 } // namespace rpc
166 } // namespace carla
::rpc::server _server
Definition: rpc/Server.h:70
void AsyncRun(size_t worker_threads)
Definition: rpc/Server.h:49
static auto WrapAsyncCall(FuncT &&functor)
Wraps functor into a function type with equivalent signature that handles the metadata sent by the cl...
Definition: rpc/Server.h:129
Server(Args &&... args)
Definition: rpc/Server.h:144
void BindSync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:150
carla::rpc::Response< T > R
Definition: CarlaServer.cpp:74
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:133
An RPC server in which functions can be bind to run synchronously or asynchronously.
Definition: rpc/Server.h:37
void BindAsync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:158
bool IsResponseIgnored() const
Definition: Metadata.h:28
boost::asio::io_context _sync_io_context
Definition: rpc/Server.h:68
auto MoveHandler(FunctorT &&func)
Hack to trick asio into accepting move-only handlers, if the handler were actually copied it would re...
Definition: MoveHandler.h:33
static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor)
Wraps functor into a function type with equivalent signature.
Definition: rpc/Server.h:107
Positive time duration up to milliseconds resolution.
Definition: Time.h:19
void SyncRunFor(time_duration duration)
Definition: rpc/Server.h:53
constexpr auto to_chrono() const
Definition: Time.h:50
Metadata of an RPC function call.
Definition: Metadata.h:15