CARLA
test_benchmark_streaming.cpp
Go to the documentation of this file.
1 // Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2 // de Barcelona (UAB).
3 //
4 // This work is licensed under the terms of the MIT license.
5 // For a copy, see <https://opensource.org/licenses/MIT>.
6 
7 #include "test.h"
8 
9 #include <carla/Buffer.h>
10 #include <carla/BufferView.h>
11 #include <carla/streaming/Client.h>
12 #include <carla/streaming/Server.h>
13 
14 #include <boost/asio/post.hpp>
15 
16 #include <algorithm>
17 
18 using namespace carla::streaming;
19 using namespace std::chrono_literals;
20 
21 static auto make_special_message(size_t size) {
22  std::vector<uint32_t> v(size/sizeof(uint32_t), 42u);
23  carla::Buffer msg(v);
24  EXPECT_EQ(msg.size(), size);
26 
27  return BufView;
28 }
29 
30 class Benchmark {
31 public:
32 
33  Benchmark(uint16_t port, size_t message_size, double success_ratio)
34  : _server(port),
35  _client(),
36  _message(make_special_message(message_size)),
37  _client_callback(),
38  _work_to_do(_client_callback),
39  _success_ratio(success_ratio) {}
40 
41  void AddStream() {
42  Stream stream = _server.MakeStream();
43 
44  _client.Subscribe(stream.token(), [this](carla::Buffer msg) {
46  DEBUG_ASSERT_EQ(BufView->size(), _message->size());
47  boost::asio::post(_client_callback, [this]() {
48  CARLA_PROFILE_FPS(client, listen_callback);
49  ++_number_of_messages_received;
50  });
51  });
52 
53  _streams.push_back(stream);
54  }
55 
56  void AddStreams(size_t count) {
57  for (auto i = 0u; i < count; ++i) {
58  AddStream();
59  }
60  }
61 
62  void Run(size_t number_of_messages) {
63  _threads.CreateThread([this]() { _client_callback.run(); });
64  _server.AsyncRun(_streams.size());
65  _client.AsyncRun(_streams.size());
66 
67  std::this_thread::sleep_for(1s); // the client needs to be ready so we make
68  // sure we get all the messages.
69 
70  for (auto &&stream : _streams) {
71  _threads.CreateThread([=]() mutable {
72  for (auto i = 0u; i < number_of_messages; ++i) {
73  std::this_thread::sleep_for(11ms); // ~90FPS.
74  {
76  stream.Write(_message);
77  }
78  }
79  });
80  }
81 
82  const auto expected_number_of_messages = _streams.size() * number_of_messages;
83  const auto threshold =
84  static_cast<size_t>(_success_ratio * static_cast<double>(expected_number_of_messages));
85 
86  for (auto i = 0u; i < 10; ++i) {
87  std::cout << "received " << _number_of_messages_received
88  << " of " << expected_number_of_messages
89  << " messages,";
90  if (_number_of_messages_received >= expected_number_of_messages) {
91  break;
92  }
93  std::cout << " waiting..." << std::endl;
94  std::this_thread::sleep_for(1s);
95  }
96 
97  _client_callback.stop();
98  _threads.JoinAll();
99  std::cout << " done." << std::endl;
100 
101 #ifdef NDEBUG
102  ASSERT_GE(_number_of_messages_received, threshold);
103 #else
104  if (_number_of_messages_received < threshold) {
105  carla::log_warning("threshold unmet:", _number_of_messages_received, '/', threshold);
106  }
107 #endif // NDEBUG
108  }
109 
110 private:
111 
113 
115 
117 
119 
120  boost::asio::io_context _client_callback;
121 
122  boost::asio::io_context::work _work_to_do;
123 
124  const double _success_ratio;
125 
126  std::vector<Stream> _streams;
127 
128  std::atomic_size_t _number_of_messages_received{0u};
129 };
130 
131 static size_t get_max_concurrency() {
132  size_t concurrency = std::thread::hardware_concurrency() / 2u;
133  return std::max((size_t) 2u, concurrency);
134 }
135 
136 static void benchmark_image(
137  const size_t dimensions,
138  const size_t number_of_streams = 1u,
139  const double success_ratio = 1.0) {
140  constexpr auto number_of_messages = 100u;
141  carla::logging::log("Benchmark:", number_of_streams, "streams at 90FPS.");
142  Benchmark benchmark(TESTING_PORT, 4u * dimensions, success_ratio);
143  benchmark.AddStreams(number_of_streams);
144  benchmark.Run(number_of_messages);
145 }
146 
147 TEST(benchmark_streaming, image_200x200) {
148  benchmark_image(200u * 200u);
149 }
150 
151 TEST(benchmark_streaming, image_800x600) {
152  benchmark_image(800u * 600u, 1u, 0.9);
153 }
154 
155 TEST(benchmark_streaming, image_1920x1080) {
156  benchmark_image(1920u * 1080u, 1u, 0.9);
157 }
158 
159 TEST(benchmark_streaming, image_200x200_mt) {
160  benchmark_image(200u * 200u, get_max_concurrency());
161 }
162 
163 TEST(benchmark_streaming, image_800x600_mt) {
164  benchmark_image(800u * 600u, get_max_concurrency(), 0.9);
165 }
166 
167 TEST(benchmark_streaming, image_1920x1080_mt) {
168  benchmark_image(1920u * 1080u, get_max_concurrency(), 0.9);
169 }
boost::asio::io_context::work _work_to_do
constexpr uint16_t TESTING_PORT
Definition: test.h:24
static void log(Args &&... args)
Definition: Logging.h:59
void AddStreams(size_t count)
carla::ThreadGroup _threads
boost::asio::io_context _client_callback
A streaming server.
const carla::SharedBufferView _message
#define CARLA_PROFILE_SCOPE(context, profiler_name)
Definition: Profiler.h:10
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition: Debug.h:76
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition: BufferView.h:56
Benchmark(uint16_t port, size_t message_size, double success_ratio)
static LIBCARLA_NOINLINE void write_to_stream(std::ostream &out, Arg &&arg, Args &&... args)
Definition: Logging.h:52
Token token() const
Token associated with this stream.
Definition: detail/Stream.h:35
static size_t get_max_concurrency()
static void log_warning(Args &&... args)
Definition: Logging.h:96
std::vector< Stream > _streams
#define CARLA_PROFILE_FPS(context, profiler_name)
Definition: Profiler.h:11
A client able to subscribe to multiple streams.
void Run(size_t number_of_messages)
const double _success_ratio
A piece of raw data.
Definition: carla/Buffer.h:42
std::shared_ptr< BufferView > SharedBufferView
Definition: BufferView.h:151
size_type size() const noexcept
Definition: carla/Buffer.h:197
TEST(benchmark_streaming, image_200x200)
static void benchmark_image(const size_t dimensions, const size_t number_of_streams=1u, const double success_ratio=1.0)
static auto make_special_message(size_t size)