CARLA
test_benchmark_streaming.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "test.h"
8 
10 #include <carla/streaming/Server.h>
11 
12 #include <boost/asio/post.hpp>
13 
14 #include <algorithm>
15 
16 using namespace carla::streaming;
17 using namespace std::chrono_literals;
18 
19 static auto make_special_message(size_t size) {
20  std::vector<uint32_t> v(size/sizeof(uint32_t), 42u);
21  carla::Buffer msg(v);
22  EXPECT_EQ(msg.size(), size);
23  return msg;
24 }
25 
26 class Benchmark {
27 public:
28 
29  Benchmark(uint16_t port, size_t message_size, double success_ratio)
30  : _server(port),
31  _client(),
32  _message(make_special_message(message_size)),
33  _client_callback(),
34  _work_to_do(_client_callback),
35  _success_ratio(success_ratio) {}
36 
37  void AddStream() {
38  Stream stream = _server.MakeStream();
39 
40  _client.Subscribe(stream.token(), [this](carla::Buffer DEBUG_ONLY(msg)) {
41  DEBUG_ASSERT_EQ(msg.size(), _message.size());
42  DEBUG_ASSERT(msg == _message);
43  boost::asio::post(_client_callback, [this]() {
44  CARLA_PROFILE_FPS(client, listen_callback);
45  ++_number_of_messages_received;
46  });
47  });
48 
49  _streams.push_back(stream);
50  }
51 
52  void AddStreams(size_t count) {
53  for (auto i = 0u; i < count; ++i) {
54  AddStream();
55  }
56  }
57 
58  void Run(size_t number_of_messages) {
59  _threads.CreateThread([this]() { _client_callback.run(); });
60  _server.AsyncRun(_streams.size());
61  _client.AsyncRun(_streams.size());
62 
63  std::this_thread::sleep_for(1s); // the client needs to be ready so we make
64  // sure we get all the messages.
65 
66  for (auto &&stream : _streams) {
67  _threads.CreateThread([=]() mutable {
68  for (auto i = 0u; i < number_of_messages; ++i) {
69  std::this_thread::sleep_for(11ms); // ~90FPS.
70  {
72  stream << _message.buffer();
73  }
74  }
75  });
76  }
77 
78  const auto expected_number_of_messages = _streams.size() * number_of_messages;
79  const auto threshold =
80  static_cast<size_t>(_success_ratio * static_cast<double>(expected_number_of_messages));
81 
82  for (auto i = 0u; i < 10; ++i) {
83  std::cout << "received " << _number_of_messages_received
84  << " of " << expected_number_of_messages
85  << " messages,";
86  if (_number_of_messages_received >= expected_number_of_messages) {
87  break;
88  }
89  std::cout << " waiting..." << std::endl;
90  std::this_thread::sleep_for(1s);
91  }
92 
93  _client_callback.stop();
94  _threads.JoinAll();
95  std::cout << " done." << std::endl;
96 
97 #ifdef NDEBUG
98  ASSERT_GE(_number_of_messages_received, threshold);
99 #else
100  if (_number_of_messages_received < threshold) {
101  carla::log_warning("threshold unmet:", _number_of_messages_received, '/', threshold);
102  }
103 #endif // NDEBUG
104  }
105 
106 private:
107 
109 
111 
113 
115 
116  boost::asio::io_context _client_callback;
117 
118  boost::asio::io_context::work _work_to_do;
119 
120  const double _success_ratio;
121 
122  std::vector<Stream> _streams;
123 
124  std::atomic_size_t _number_of_messages_received{0u};
125 };
126 
127 static size_t get_max_concurrency() {
128  size_t concurrency = std::thread::hardware_concurrency() / 2u;
129  return std::max(2ul, concurrency);
130 }
131 
132 static void benchmark_image(
133  const size_t dimensions,
134  const size_t number_of_streams = 1u,
135  const double success_ratio = 1.0) {
136  constexpr auto number_of_messages = 100u;
137  carla::logging::log("Benchmark:", number_of_streams, "streams at 90FPS.");
138  Benchmark benchmark(TESTING_PORT, 4u * dimensions, success_ratio);
139  benchmark.AddStreams(number_of_streams);
140  benchmark.Run(number_of_messages);
141 }
142 
143 TEST(benchmark_streaming, image_200x200) {
144  benchmark_image(200u * 200u);
145 }
146 
147 TEST(benchmark_streaming, image_800x600) {
148  benchmark_image(800u * 600u, 1u, 0.9);
149 }
150 
151 TEST(benchmark_streaming, image_1920x1080) {
152  benchmark_image(1920u * 1080u, 1u, 0.9);
153 }
154 
155 TEST(benchmark_streaming, image_200x200_mt) {
156  benchmark_image(200u * 200u, get_max_concurrency());
157 }
158 
159 TEST(benchmark_streaming, image_800x600_mt) {
160  benchmark_image(800u * 600u, get_max_concurrency(), 0.9);
161 }
162 
163 TEST(benchmark_streaming, image_1920x1080_mt) {
164  benchmark_image(1920u * 1080u, get_max_concurrency(), 0.9);
165 }
boost::asio::io_context::work _work_to_do
constexpr uint16_t TESTING_PORT
Definition: test.h:24
static void log(Args &&... args)
Definition: Logging.h:59
void AddStreams(size_t count)
carla::ThreadGroup _threads
boost::asio::io_context _client_callback
A streaming server.
#define CARLA_PROFILE_SCOPE(context, profiler_name)
Definition: Profiler.h:10
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
#define DEBUG_ASSERT(predicate)
Definition: Debug.h:66
Benchmark(uint16_t port, size_t message_size, double success_ratio)
static LIBCARLA_NOINLINE void write_to_stream(std::ostream &out, Arg &&arg, Args &&... args)
Definition: Logging.h:52
Token token() const
Token associated with this stream.
Definition: detail/Stream.h:35
const carla::Buffer _message
static size_t get_max_concurrency()
static void log_warning(Args &&... args)
Definition: Logging.h:96
std::vector< Stream > _streams
#define CARLA_PROFILE_FPS(context, profiler_name)
Definition: Profiler.h:11
A client able to subscribe to multiple streams.
void Run(size_t number_of_messages)
const double _success_ratio
A piece of raw data.
Definition: carla/Buffer.h:39
size_type size() const noexcept
Definition: carla/Buffer.h:194
TEST(benchmark_streaming, image_200x200)
#define DEBUG_ONLY(code)
Definition: Debug.h:55
static void benchmark_image(const size_t dimensions, const size_t number_of_streams=1u, const double success_ratio=1.0)
static auto make_special_message(size_t size)