30     : _work_to_do(service) {
    31     _threads.CreateThreads(threads, [
this]() { service.run(); });
    45 TEST(streaming, low_level_sending_strings) {
    51   constexpr 
auto number_of_messages = 100u;
    52   const std::string message_text = 
"Hello client!";
    54   std::atomic_size_t message_count{0u};
    66     ASSERT_EQ(message.size(), message_text.size());
    67     const std::string msg = 
as_string(message);
    68     ASSERT_EQ(msg, message_text);
    71   carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
    73   for (
auto i = 0u; i < number_of_messages; ++i) {
    74     std::this_thread::sleep_for(2ms);
    79   std::this_thread::sleep_for(2ms);
    80   ASSERT_GE(message_count, number_of_messages - 3u);
    85 TEST(streaming, low_level_unsubscribing) {
    91   constexpr 
auto number_of_messages = 50u;
    92   const std::string message_text = 
"Hello client!";
   100   for (
auto n = 0u; n < 10u; ++n) {
   102     std::atomic_size_t message_count{0u};
   106       ASSERT_EQ(message.size(), message_text.size());
   107       const std::string msg = 
as_string(message);
   108       ASSERT_EQ(msg, message_text);
   111     carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
   113     for (
auto i = 0u; i < number_of_messages; ++i) {
   114       std::this_thread::sleep_for(4ms);
   119     std::this_thread::sleep_for(4ms);
   122     for (
auto i = 0u; i < number_of_messages; ++i) {
   123       std::this_thread::sleep_for(2ms);
   128     ASSERT_GE(message_count, number_of_messages - 3u);
   134 TEST(streaming, low_level_tcp_small_message) {
   138   boost::asio::io_context io_context;
   143   std::atomic_bool done{
false};
   144   std::atomic_size_t message_count{0u};
   146   const std::string msg = 
"Hola!";
   148   srv.
Listen([&](std::shared_ptr<tcp::ServerSession> session) {
   149     ASSERT_EQ(session->get_stream_id(), 1u);
   151     carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
   154       std::this_thread::sleep_for(1ns);
   156       session->Write(View);
   158     std::cout << 
"done!\n";
   159   }, [](std::shared_ptr<tcp::ServerSession>) { std::cout << 
"session closed!\n"; });
   163   auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](
carla::Buffer message) {
   165     ASSERT_FALSE(message.empty());
   166     ASSERT_EQ(message.size(), 5u);
   168     ASSERT_EQ(received, msg);
   175       std::max(2u, std::thread::hardware_concurrency()),
   176       [&]() { io_context.run(); });
   178   std::this_thread::sleep_for(2s);
   181   std::cout << 
"client received " << message_count << 
" messages\n";
   182   ASSERT_GT(message_count, 10u);
   188   std::atomic_bool &done;
   191 TEST(streaming, stream_outlives_server) {
   194   constexpr 
size_t iterations = 10u;
   195   std::atomic_bool done{
false};
   196   const std::string message = 
"Hello client, how are you?";
   197   std::shared_ptr<Stream> stream;
   203     carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
   206       std::this_thread::sleep_for(1ms);
   207       auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
   215   for (
auto i = 0u; i < iterations; ++i) {
   219       auto s = std::make_shared<Stream>(srv.
MakeStream());
   220       std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
   222     std::atomic_size_t messages_received{0u};
   226       c.
Subscribe(stream->token(), [&](
auto buffer) {
   227         const std::string result = 
as_string(buffer);
   228         ASSERT_EQ(result, message);
   231       std::this_thread::sleep_for(20ms);
   233     ASSERT_GT(messages_received, 0u);
   235   std::this_thread::sleep_for(20ms);
   239 TEST(streaming, multi_stream) {
   242   constexpr 
size_t number_of_messages = 100u;
   243   constexpr 
size_t number_of_clients = 6u;
   244   constexpr 
size_t iterations = 10u;
   245   const std::string message = 
"Hi y'all!";
   248   srv.AsyncRun(number_of_clients);
   251   for (
auto i = 0u; i < iterations; ++i) {
   252     std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
   254     for (
auto &pair : v) {
   256       pair.second = std::make_unique<Client>();
   257       pair.second->AsyncRun(1u);
   258       pair.second->Subscribe(stream.token(), [&](
auto buffer) {
   259         const std::string result = 
as_string(buffer);
   260         ASSERT_EQ(result, message);
   265     carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
   267     std::this_thread::sleep_for(6ms);
   268     for (
auto j = 0u; j < number_of_messages; ++j) {
   269       std::this_thread::sleep_for(6ms);
   273     std::this_thread::sleep_for(6ms);
   275     for (
auto &pair : v) {
   276       ASSERT_GE(pair.first, number_of_messages - 3u);
 
A client able to subscribe to multiple streams. 
 
constexpr uint16_t TESTING_PORT
 
A low-level streaming server. 
 
carla::ThreadGroup _threads
 
boost::asio::ip::tcp::endpoint endpoint
 
void CreateThread(F &&functor)
 
boost::asio::io_context service
 
static std::string as_string(const Buffer &buf)
 
TEST(streaming, low_level_sending_strings)
 
void UnSubscribe(token_type token)
 
void SetTimeout(time_duration timeout)
Set session time-out. 
 
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
 
void CreateThreads(size_t count, F functor)
 
void AsyncRun(size_t worker_threads)
 
io_context_running(size_t threads=2u)
 
void Subscribe(const Token &token, Functor &&callback)
 
Keeps the mapping between streams and sessions. 
 
void SetTimeout(time_duration timeout)
 
A client able to subscribe to multiple streams. 
 
endpoint GetLocalEndpoint() const
 
boost::asio::io_context::work _work_to_do
 
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
Start listening for connections. 
 
std::shared_ptr< BufferView > SharedBufferView
 
carla::streaming::Stream MakeStream()
 
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)