CARLA
test_streaming.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "test.h"
8 
9 #include <carla/ThreadGroup.h>
10 #include <carla/streaming/Client.h>
11 #include <carla/streaming/Server.h>
17 
18 #include <atomic>
19 
20 using namespace std::chrono_literals;
21 
22 // This is required for low level to properly stop the threads in case of
23 // exception/assert.
25 public:
26 
27  boost::asio::io_context service;
28 
29  explicit io_context_running(size_t threads = 2u)
30  : _work_to_do(service) {
31  _threads.CreateThreads(threads, [this]() { service.run(); });
32  }
33 
35  service.stop();
36  }
37 
38 private:
39 
40  boost::asio::io_context::work _work_to_do;
41 
43 };
44 
45 TEST(streaming, low_level_sending_strings) {
46  using namespace util::buffer;
47  using namespace carla::streaming;
48  using namespace carla::streaming::detail;
49  using namespace carla::streaming::low_level;
50 
51  constexpr auto number_of_messages = 100u;
52  const std::string message_text = "Hello client!";
53 
54  std::atomic_size_t message_count{0u};
55 
57 
59  srv.SetTimeout(1s);
60 
61  auto stream = srv.MakeStream();
62 
64  c.Subscribe(io.service, stream.token(), [&](auto message) {
65  ++message_count;
66  ASSERT_EQ(message.size(), message_text.size());
67  const std::string msg = as_string(message);
68  ASSERT_EQ(msg, message_text);
69  });
70 
71  carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
73  for (auto i = 0u; i < number_of_messages; ++i) {
74  std::this_thread::sleep_for(2ms);
75  carla::SharedBufferView View = BufView;
76  stream.Write(View);
77  }
78 
79  std::this_thread::sleep_for(2ms);
80  ASSERT_GE(message_count, number_of_messages - 3u);
81 
82  io.service.stop();
83 }
84 
85 TEST(streaming, low_level_unsubscribing) {
86  using namespace util::buffer;
87  using namespace carla::streaming;
88  using namespace carla::streaming::detail;
89  using namespace carla::streaming::low_level;
90 
91  constexpr auto number_of_messages = 50u;
92  const std::string message_text = "Hello client!";
93 
95 
97  srv.SetTimeout(1s);
98 
100  for (auto n = 0u; n < 10u; ++n) {
101  auto stream = srv.MakeStream();
102  std::atomic_size_t message_count{0u};
103 
104  c.Subscribe(io.service, stream.token(), [&](auto message) {
105  ++message_count;
106  ASSERT_EQ(message.size(), message_text.size());
107  const std::string msg = as_string(message);
108  ASSERT_EQ(msg, message_text);
109  });
110 
111  carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
112  carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(Buf));
113  for (auto i = 0u; i < number_of_messages; ++i) {
114  std::this_thread::sleep_for(4ms);
115  carla::SharedBufferView View = BufView;
116  stream.Write(View);
117  }
118 
119  std::this_thread::sleep_for(4ms);
120  c.UnSubscribe(stream.token());
121 
122  for (auto i = 0u; i < number_of_messages; ++i) {
123  std::this_thread::sleep_for(2ms);
124  carla::SharedBufferView View = BufView;
125  stream.Write(View);
126  }
127 
128  ASSERT_GE(message_count, number_of_messages - 3u);
129  }
130 
131  io.service.stop();
132 }
133 
134 TEST(streaming, low_level_tcp_small_message) {
135  using namespace carla::streaming;
136  using namespace carla::streaming::detail;
137 
138  boost::asio::io_context io_context;
139  tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
140 
141  tcp::Server srv(io_context, ep);
142  srv.SetTimeout(1s);
143  std::atomic_bool done{false};
144  std::atomic_size_t message_count{0u};
145 
146  const std::string msg = "Hola!";
147 
148  srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
149  ASSERT_EQ(session->get_stream_id(), 1u);
150 
151  carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
152  carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(Buf));
153  while (!done) {
154  std::this_thread::sleep_for(1ns);
155  carla::SharedBufferView View = BufView;
156  session->Write(View);
157  }
158  std::cout << "done!\n";
159  }, [](std::shared_ptr<tcp::ServerSession>) { std::cout << "session closed!\n"; });
160 
161  Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(srv.GetLocalEndpoint())};
162  auto stream = dispatcher.MakeStream();
163  auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](carla::Buffer message) {
164  ++message_count;
165  ASSERT_FALSE(message.empty());
166  ASSERT_EQ(message.size(), 5u);
167  const std::string received = util::buffer::as_string(message);
168  ASSERT_EQ(received, msg);
169  });
170  c->Connect();
171 
172  // We need at least two threads because this server loop consumes one.
173  carla::ThreadGroup threads;
174  threads.CreateThreads(
175  std::max(2u, std::thread::hardware_concurrency()),
176  [&]() { io_context.run(); });
177 
178  std::this_thread::sleep_for(2s);
179  io_context.stop();
180  done = true;
181  std::cout << "client received " << message_count << " messages\n";
182  ASSERT_GT(message_count, 10u);
183  c->Stop();
184 }
185 
186 struct DoneGuard {
187  ~DoneGuard() { done = true; };
188  std::atomic_bool &done;
189 };
190 
191 TEST(streaming, stream_outlives_server) {
192  using namespace carla::streaming;
193  using namespace util::buffer;
194  constexpr size_t iterations = 10u;
195  std::atomic_bool done{false};
196  const std::string message = "Hello client, how are you?";
197  std::shared_ptr<Stream> stream;
198 
199  carla::ThreadGroup sender;
200  DoneGuard g = {done};
201  sender.CreateThread([&]() {
202 
203  carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
204  carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(Buf));
205  while (!done) {
206  std::this_thread::sleep_for(1ms);
207  auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
208  if (s != nullptr) {
209  carla::SharedBufferView View = BufView;
210  s->Write(View);
211  }
212  }
213  });
214 
215  for (auto i = 0u; i < iterations; ++i) {
216  Server srv(TESTING_PORT);
217  srv.AsyncRun(2u);
218  {
219  auto s = std::make_shared<Stream>(srv.MakeStream());
220  std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
221  }
222  std::atomic_size_t messages_received{0u};
223  {
224  Client c;
225  c.AsyncRun(2u);
226  c.Subscribe(stream->token(), [&](auto buffer) {
227  const std::string result = as_string(buffer);
228  ASSERT_EQ(result, message);
229  ++messages_received;
230  });
231  std::this_thread::sleep_for(20ms);
232  } // client dies here.
233  ASSERT_GT(messages_received, 0u);
234  } // server dies here.
235  std::this_thread::sleep_for(20ms);
236  done = true;
237 } // stream dies here.
238 
239 TEST(streaming, multi_stream) {
240  using namespace carla::streaming;
241  using namespace util::buffer;
242  constexpr size_t number_of_messages = 100u;
243  constexpr size_t number_of_clients = 6u;
244  constexpr size_t iterations = 10u;
245  const std::string message = "Hi y'all!";
246 
247  Server srv(TESTING_PORT);
248  srv.AsyncRun(number_of_clients);
249  auto stream = srv.MakeStream();
250 
251  for (auto i = 0u; i < iterations; ++i) {
252  std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
253 
254  for (auto &pair : v) {
255  pair.first = 0u;
256  pair.second = std::make_unique<Client>();
257  pair.second->AsyncRun(1u);
258  pair.second->Subscribe(stream.token(), [&](auto buffer) {
259  const std::string result = as_string(buffer);
260  ASSERT_EQ(result, message);
261  ++pair.first;
262  });
263  }
264 
265  carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
266  carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(Buf));
267  std::this_thread::sleep_for(6ms);
268  for (auto j = 0u; j < number_of_messages; ++j) {
269  std::this_thread::sleep_for(6ms);
270  carla::SharedBufferView View = BufView;
271  stream.Write(View);
272  }
273  std::this_thread::sleep_for(6ms);
274 
275  for (auto &pair : v) {
276  ASSERT_GE(pair.first, number_of_messages - 3u);
277  }
278  }
279 }
A client able to subscribe to multiple streams.
constexpr uint16_t TESTING_PORT
Definition: test.h:24
carla::ThreadGroup _threads
void CreateThread(F &&functor)
Definition: ThreadGroup.h:27
boost::asio::io_context service
static std::string as_string(const Buffer &buf)
Definition: test/Buffer.h:37
TEST(streaming, low_level_sending_strings)
void SetTimeout(time_duration timeout)
Set session time-out.
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition: BufferView.h:56
void CreateThreads(size_t count, F functor)
Definition: ThreadGroup.h:32
void AsyncRun(size_t worker_threads)
io_context_running(size_t threads=2u)
void Subscribe(const Token &token, Functor &&callback)
Keeps the mapping between streams and sessions.
Definition: Dispatcher.h:27
A client able to subscribe to multiple streams.
boost::asio::io_context::work _work_to_do
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
Start listening for connections.
A piece of raw data.
Definition: carla/Buffer.h:42
std::shared_ptr< BufferView > SharedBufferView
Definition: BufferView.h:151
carla::streaming::Stream MakeStream()
Definition: Dispatcher.cpp:37
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)