CARLA
rpc/Server.h
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #pragma once
8 
9 #include "carla/MoveHandler.h"
10 #include "carla/Time.h"
11 #include "carla/rpc/Metadata.h"
12 #include "carla/rpc/Response.h"
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/asio/post.hpp>
16 
17 #include <rpc/server.h>
18 
19 #include <future>
20 
21 namespace carla {
22 namespace rpc {
23 
24  // ===========================================================================
25  // -- Server -----------------------------------------------------------------
26  // ===========================================================================
27 
28  /// An RPC server in which functions can be bind to run synchronously or
29  /// asynchronously.
30  ///
31  /// Use `AsyncRun` to start the worker threads, and use `SyncRunFor` to
32  /// run a slice of work in the caller's thread.
33  ///
34  /// Functions that are bind using `BindAsync` will run asynchronously in the
35  /// worker threads. Functions that are bind using `BindSync` will run within
36  /// `SyncRunFor` function.
37  class Server {
38  public:
39 
40  template <typename... Args>
41  explicit Server(Args &&... args);
42 
43  template <typename FunctorT>
44  void BindSync(const std::string &name, FunctorT &&functor);
45 
46  template <typename FunctorT>
47  void BindAsync(const std::string &name, FunctorT &&functor);
48 
49  void AsyncRun(size_t worker_threads) {
50  _server.async_run(worker_threads);
51  }
52 
53  void SyncRunFor(time_duration duration) {
54  _sync_io_context.reset();
55  _sync_io_context.run_for(duration.to_chrono());
56  }
57 
58  /// @warning does not stop the game thread.
59  void Stop() {
60  _server.stop();
61  }
62 
63  private:
64 
65  boost::asio::io_context _sync_io_context;
66 
67  ::rpc::server _server;
68  };
69 
70  // ===========================================================================
71  // -- Server implementation --------------------------------------------------
72  // ===========================================================================
73 
74 namespace detail {
75 
76  template <typename T>
77  struct FunctionWrapper : FunctionWrapper<decltype(&T::operator())> {};
78 
79  template <typename C, typename R, typename... Args>
80  struct FunctionWrapper<R (C::*)(Args...)> : FunctionWrapper<R (*)(Args...)> {};
81 
82  template <typename C, typename R, typename... Args>
83  struct FunctionWrapper<R (C::*)(Args...) const> : FunctionWrapper<R (*)(Args...)> {};
84 
85  template<class T>
86  struct FunctionWrapper<T &> : public FunctionWrapper<T> {};
87 
88  template<class T>
89  struct FunctionWrapper<T &&> : public FunctionWrapper<T> {};
90 
91  template <typename R, typename... Args>
92  struct FunctionWrapper<R (*)(Args...)> {
93 
94  /// Wraps @a functor into a function type with equivalent signature. The
95  /// wrap function returned. When called, posts @a functor into the
96  /// io_context; if the client called this method synchronously, waits for
97  /// the posted task to finish, otherwise returns immediately.
98  ///
99  /// This way, no matter from which thread the wrap function is called, the
100  /// @a functor provided is always called from the context of the io_context.
101  /// I.e., we can use the io_context to run tasks on a specific thread (e.g.
102  /// game thread).
103  template <typename FuncT>
104  static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor) {
105  return [&io, functor=std::forward<FuncT>(functor)](Metadata metadata, Args... args) -> R {
106  auto task = std::packaged_task<R()>([functor=std::move(functor), args...]() {
107  return functor(args...);
108  });
109  if (metadata.IsResponseIgnored()) {
110  // Post task and ignore result.
111  boost::asio::post(io, MoveHandler(task));
112  return R();
113  } else {
114  // Post task and wait for result.
115  auto result = task.get_future();
116  boost::asio::post(io, MoveHandler(task));
117  return result.get();
118  }
119  };
120  }
121 
122  /// Wraps @a functor into a function type with equivalent signature that
123  /// handles the metadata sent by the client. If the client called this
124  /// method asynchronously, the result is ignored.
125  template <typename FuncT>
126  static auto WrapAsyncCall(FuncT &&functor) {
127  return [functor=std::forward<FuncT>(functor)](::carla::rpc::Metadata metadata, Args... args) -> R {
128  if (metadata.IsResponseIgnored()) {
129  functor(args...);
130  return R();
131  } else {
132  return functor(args...);
133  }
134  };
135  }
136  };
137 
138 } // namespace detail
139 
140  template <typename ... Args>
141  inline Server::Server(Args && ... args)
142  : _server(std::forward<Args>(args) ...) {
143  _server.suppress_exceptions(true);
144  }
145 
146  template <typename FunctorT>
147  inline void Server::BindSync(const std::string &name, FunctorT &&functor) {
148  using Wrapper = detail::FunctionWrapper<FunctorT>;
149  _server.bind(
150  name,
151  Wrapper::WrapSyncCall(_sync_io_context, std::forward<FunctorT>(functor)));
152  }
153 
154  template <typename FunctorT>
155  inline void Server::BindAsync(const std::string &name, FunctorT &&functor) {
156  using Wrapper = detail::FunctionWrapper<FunctorT>;
157  _server.bind(
158  name,
159  Wrapper::WrapAsyncCall(std::forward<FunctorT>(functor)));
160  }
161 
162 } // namespace rpc
163 } // namespace carla
::rpc::server _server
Definition: rpc/Server.h:67
void AsyncRun(size_t worker_threads)
Definition: rpc/Server.h:49
static auto WrapAsyncCall(FuncT &&functor)
Wraps functor into a function type with equivalent signature that handles the metadata sent by the cl...
Definition: rpc/Server.h:126
Server(Args &&... args)
Definition: rpc/Server.h:141
void BindSync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:147
carla::rpc::Response< T > R
Definition: CarlaServer.cpp:57
This file contains definitions of common data structures used in traffic manager. ...
Definition: Carla.cpp:99
An RPC server in which functions can be bind to run synchronously or asynchronously.
Definition: rpc/Server.h:37
void BindAsync(const std::string &name, FunctorT &&functor)
Definition: rpc/Server.h:155
bool IsResponseIgnored() const
Definition: Metadata.h:28
boost::asio::io_context _sync_io_context
Definition: rpc/Server.h:65
auto MoveHandler(FunctorT &&func)
Hack to trick asio into accepting move-only handlers, if the handler were actually copied it would re...
Definition: MoveHandler.h:33
static auto WrapSyncCall(boost::asio::io_context &io, FuncT &&functor)
Wraps functor into a function type with equivalent signature.
Definition: rpc/Server.h:104
Positive time duration up to milliseconds resolution.
Definition: Time.h:19
void SyncRunFor(time_duration duration)
Definition: rpc/Server.h:53
constexpr auto to_chrono() const
Definition: Time.h:50
Metadata of an RPC function call.
Definition: Metadata.h:15