30 : _work_to_do(service) {
31 _threads.CreateThreads(threads, [
this]() { service.run(); });
45 TEST(streaming, low_level_sending_strings) {
51 constexpr
auto number_of_messages = 100u;
52 const std::string message_text =
"Hello client!";
54 std::atomic_size_t message_count{0u};
66 ASSERT_EQ(message.size(), message_text.size());
67 const std::string msg =
as_string(message);
68 ASSERT_EQ(msg, message_text);
71 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
73 for (
auto i = 0u; i < number_of_messages; ++i) {
74 std::this_thread::sleep_for(2ms);
79 std::this_thread::sleep_for(2ms);
80 ASSERT_GE(message_count, number_of_messages - 3u);
85 TEST(streaming, low_level_unsubscribing) {
91 constexpr
auto number_of_messages = 50u;
92 const std::string message_text =
"Hello client!";
100 for (
auto n = 0u; n < 10u; ++n) {
102 std::atomic_size_t message_count{0u};
106 ASSERT_EQ(message.size(), message_text.size());
107 const std::string msg =
as_string(message);
108 ASSERT_EQ(msg, message_text);
111 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
113 for (
auto i = 0u; i < number_of_messages; ++i) {
114 std::this_thread::sleep_for(4ms);
119 std::this_thread::sleep_for(4ms);
122 for (
auto i = 0u; i < number_of_messages; ++i) {
123 std::this_thread::sleep_for(2ms);
128 ASSERT_GE(message_count, number_of_messages - 3u);
134 TEST(streaming, low_level_tcp_small_message) {
138 boost::asio::io_context io_context;
143 std::atomic_bool done{
false};
144 std::atomic_size_t message_count{0u};
146 const std::string msg =
"Hola!";
148 srv.
Listen([&](std::shared_ptr<tcp::ServerSession> session) {
149 ASSERT_EQ(session->get_stream_id(), 1u);
151 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
154 std::this_thread::sleep_for(1ns);
156 session->Write(View);
158 std::cout <<
"done!\n";
159 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout <<
"session closed!\n"; });
163 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](
carla::Buffer message) {
165 ASSERT_FALSE(message.empty());
166 ASSERT_EQ(message.size(), 5u);
168 ASSERT_EQ(received, msg);
175 std::max(2u, std::thread::hardware_concurrency()),
176 [&]() { io_context.run(); });
178 std::this_thread::sleep_for(2s);
181 std::cout <<
"client received " << message_count <<
" messages\n";
182 ASSERT_GT(message_count, 10u);
188 std::atomic_bool &done;
191 TEST(streaming, stream_outlives_server) {
194 constexpr
size_t iterations = 10u;
195 std::atomic_bool done{
false};
196 const std::string message =
"Hello client, how are you?";
197 std::shared_ptr<Stream> stream;
203 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
206 std::this_thread::sleep_for(1ms);
207 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
215 for (
auto i = 0u; i < iterations; ++i) {
219 auto s = std::make_shared<Stream>(srv.
MakeStream());
220 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
222 std::atomic_size_t messages_received{0u};
226 c.
Subscribe(stream->token(), [&](
auto buffer) {
227 const std::string result =
as_string(buffer);
228 ASSERT_EQ(result, message);
231 std::this_thread::sleep_for(20ms);
233 ASSERT_GT(messages_received, 0u);
235 std::this_thread::sleep_for(20ms);
239 TEST(streaming, multi_stream) {
242 constexpr
size_t number_of_messages = 100u;
243 constexpr
size_t number_of_clients = 6u;
244 constexpr
size_t iterations = 10u;
245 const std::string message =
"Hi y'all!";
248 srv.AsyncRun(number_of_clients);
251 for (
auto i = 0u; i < iterations; ++i) {
252 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
254 for (
auto &pair : v) {
256 pair.second = std::make_unique<Client>();
257 pair.second->AsyncRun(1u);
258 pair.second->Subscribe(stream.token(), [&](
auto buffer) {
259 const std::string result =
as_string(buffer);
260 ASSERT_EQ(result, message);
265 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
267 std::this_thread::sleep_for(6ms);
268 for (
auto j = 0u; j < number_of_messages; ++j) {
269 std::this_thread::sleep_for(6ms);
273 std::this_thread::sleep_for(6ms);
275 for (
auto &pair : v) {
276 ASSERT_GE(pair.first, number_of_messages - 3u);
A client able to subscribe to multiple streams.
constexpr uint16_t TESTING_PORT
A low-level streaming server.
carla::ThreadGroup _threads
boost::asio::ip::tcp::endpoint endpoint
void CreateThread(F &&functor)
boost::asio::io_context service
static std::string as_string(const Buffer &buf)
TEST(streaming, low_level_sending_strings)
void UnSubscribe(token_type token)
void SetTimeout(time_duration timeout)
Set session time-out.
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
void CreateThreads(size_t count, F functor)
void AsyncRun(size_t worker_threads)
io_context_running(size_t threads=2u)
void Subscribe(const Token &token, Functor &&callback)
Keeps the mapping between streams and sessions.
void SetTimeout(time_duration timeout)
A client able to subscribe to multiple streams.
endpoint GetLocalEndpoint() const
boost::asio::io_context::work _work_to_do
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
Start listening for connections.
std::shared_ptr< BufferView > SharedBufferView
carla::streaming::Stream MakeStream()
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)