14 #include <boost/asio/post.hpp> 22 std::vector<uint32_t> v(size/
sizeof(uint32_t), 42u);
24 EXPECT_EQ(msg.
size(), size);
33 Benchmark(uint16_t port,
size_t message_size,
double success_ratio)
38 _work_to_do(_client_callback),
39 _success_ratio(success_ratio) {}
42 Stream stream = _server.MakeStream();
47 boost::asio::post(_client_callback, [
this]() {
49 ++_number_of_messages_received;
53 _streams.push_back(stream);
57 for (
auto i = 0u; i < count; ++i) {
62 void Run(
size_t number_of_messages) {
63 _threads.CreateThread([
this]() { _client_callback.run(); });
64 _server.AsyncRun(_streams.size());
65 _client.AsyncRun(_streams.size());
67 std::this_thread::sleep_for(1s);
70 for (
auto &&stream : _streams) {
71 _threads.CreateThread([=]()
mutable {
72 for (
auto i = 0u; i < number_of_messages; ++i) {
73 std::this_thread::sleep_for(11ms);
76 stream.Write(_message);
82 const auto expected_number_of_messages = _streams.size() * number_of_messages;
83 const auto threshold =
84 static_cast<size_t>(_success_ratio *
static_cast<double>(expected_number_of_messages));
86 for (
auto i = 0u; i < 10; ++i) {
87 std::cout <<
"received " << _number_of_messages_received
88 <<
" of " << expected_number_of_messages
90 if (_number_of_messages_received >= expected_number_of_messages) {
93 std::cout <<
" waiting..." << std::endl;
94 std::this_thread::sleep_for(1s);
97 _client_callback.stop();
99 std::cout <<
" done." << std::endl;
102 ASSERT_GE(_number_of_messages_received, threshold);
104 if (_number_of_messages_received < threshold) {
128 std::atomic_size_t _number_of_messages_received{0u};
132 size_t concurrency = std::thread::hardware_concurrency() / 2u;
133 return std::max((
size_t) 2u, concurrency);
137 const size_t dimensions,
138 const size_t number_of_streams = 1u,
139 const double success_ratio = 1.0) {
140 constexpr
auto number_of_messages = 100u;
144 benchmark.
Run(number_of_messages);
147 TEST(benchmark_streaming, image_200x200) {
151 TEST(benchmark_streaming, image_800x600) {
155 TEST(benchmark_streaming, image_1920x1080) {
159 TEST(benchmark_streaming, image_200x200_mt) {
163 TEST(benchmark_streaming, image_800x600_mt) {
167 TEST(benchmark_streaming, image_1920x1080_mt) {
boost::asio::io_context::work _work_to_do
constexpr uint16_t TESTING_PORT
static void log(Args &&... args)
void AddStreams(size_t count)
carla::ThreadGroup _threads
boost::asio::io_context _client_callback
const carla::SharedBufferView _message
#define CARLA_PROFILE_SCOPE(context, profiler_name)
#define DEBUG_ASSERT_EQ(lhs, rhs)
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Benchmark(uint16_t port, size_t message_size, double success_ratio)
static LIBCARLA_NOINLINE void write_to_stream(std::ostream &out, Arg &&arg, Args &&... args)
Token token() const
Token associated with this stream.
static size_t get_max_concurrency()
static void log_warning(Args &&... args)
std::vector< Stream > _streams
#define CARLA_PROFILE_FPS(context, profiler_name)
A client able to subscribe to multiple streams.
void Run(size_t number_of_messages)
const double _success_ratio
std::shared_ptr< BufferView > SharedBufferView
size_type size() const noexcept
TEST(benchmark_streaming, image_200x200)
static void benchmark_image(const size_t dimensions, const size_t number_of_streams=1u, const double success_ratio=1.0)
static auto make_special_message(size_t size)