1111 * 3. Works correctly for both client and server connections
1212 */
1313
14+ #include < chrono>
1415#include < cstring>
16+ #include < thread>
1517#include < fcntl.h>
1618
1719#include < arpa/inet.h>
@@ -37,6 +39,23 @@ class EventHandlingTest : public RealIoTestBase {
3739 close (fd);
3840 }
3941 }
42+
43+ template <typename Predicate>
44+ bool waitForCondition (Predicate&& predicate,
45+ std::chrono::milliseconds timeout =
46+ std::chrono::milliseconds (200 ),
47+ std::chrono::milliseconds poll_interval =
48+ std::chrono::milliseconds(5 )) {
49+ const auto deadline = std::chrono::steady_clock::now () + timeout;
50+ while (std::chrono::steady_clock::now () < deadline) {
51+ if (predicate ()) {
52+ return true ;
53+ }
54+ std::this_thread::sleep_for (poll_interval);
55+ }
56+ return predicate ();
57+ }
58+
4059 // Simple read filter to capture received data
4160 class TestReadFilter : public network ::ReadFilter {
4261 public:
@@ -45,7 +64,7 @@ class EventHandlingTest : public RealIoTestBase {
4564
4665 network::FilterStatus onData (Buffer& data, bool ) override {
4766 // Copy data to our buffer
48- buffer_ .move (data );
67+ data .move (buffer_ );
4968 data_received_ = true ;
5069 return network::FilterStatus::Continue;
5170 }
@@ -241,7 +260,8 @@ TEST_F(EventHandlingTest, EdgeTriggeredReadRaceCondition) {
241260
242261 // Verify data was received despite the race condition
243262 EXPECT_TRUE (data_sent);
244- EXPECT_TRUE (server_callbacks.data_received );
263+ ASSERT_TRUE (waitForCondition (
264+ [&]() { return server_callbacks.data_received .load (); }));
245265 EXPECT_EQ (" test message" , server_callbacks.received_data .toString ());
246266}
247267
@@ -254,83 +274,65 @@ TEST_F(EventHandlingTest, WriteEventManagement) {
254274 int client_fd = fds.first ;
255275 int server_fd = fds.second ;
256276
257- std::atomic<int > write_events_before{0 };
258- std::atomic<int > write_events_during{0 };
259- std::atomic<int > write_events_after{0 };
277+ std::atomic<network::ConnectionImpl*> connection_ptr{nullptr };
260278
261- executeInDispatcher ([this , server_fd, &write_events_before,
262- &write_events_during, &write_events_after]() {
263- // Create server connection
279+ executeInDispatcher ([this , server_fd, &connection_ptr]() {
264280 auto io_handle = std::make_unique<network::IoSocketHandleImpl>(server_fd);
265281 auto socket = std::make_unique<network::ConnectionSocketImpl>(
266282 std::move (io_handle), nullptr , nullptr );
267283
268- // Create a raw transport socket (no TLS)
269- // Since we're testing connection handling, not transport, use raw sockets
270- network::TransportSocketPtr transport_socket =
271- nullptr ; // Use RawTransportSocket (created by ConnectionImpl)
284+ network::TransportSocketPtr transport_socket = nullptr ;
272285
273286 auto connection = std::make_unique<network::ConnectionImpl>(
274- *dispatcher_, std::move (socket), std::move (transport_socket),
275- true ); // true = already connected (for pipes/stdio)
287+ *dispatcher_, std::move (socket), std::move (transport_socket), true );
276288
277- TestCallbacks callbacks;
278- connection->addConnectionCallbacks (callbacks);
279- // Connection is initialized automatically
289+ connection_ptr = connection.release ();
290+ });
280291
281- // Phase 1: No data to write - should have minimal write events
282- int phase1_count = 0 ;
283- auto phase1_timer = dispatcher_->createTimer ([&phase1_count, this ]() {
284- phase1_count++;
285- if (phase1_count > 10 ) {
286- return ;
287- }
288- dispatcher_->run (event::RunType::NonBlock);
292+ ASSERT_NE (nullptr , connection_ptr.load ());
293+
294+ auto getWriteEvents = [this , &connection_ptr]() -> uint64_t {
295+ return executeInDispatcher ([conn = connection_ptr.load ()]() {
296+ return conn->getWriteEventCount ();
289297 });
290- phase1_timer->enableTimer (std::chrono::milliseconds (5 ));
291-
292- // Wait for phase 1 to complete
293- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
294- write_events_before = connection->getWriteEventCount ();
295-
296- // Phase 2: Queue data to write - should trigger write events
297- OwnedBuffer write_buffer;
298- write_buffer.add (" large data chunk that needs multiple writes" );
299- connection->write (write_buffer, false );
300-
301- int phase2_count = 0 ;
302- auto phase2_timer = dispatcher_->createTimer ([&phase2_count, this ]() {
303- phase2_count++;
304- if (phase2_count > 10 ) {
305- return ;
306- }
307- dispatcher_->run (event::RunType::NonBlock);
298+ };
299+
300+ auto writeData = [this , &connection_ptr](const std::string& payload) {
301+ executeInDispatcher ([conn = connection_ptr.load (), payload]() {
302+ OwnedBuffer buffer;
303+ buffer.add (payload);
304+ conn->write (buffer, false );
308305 });
309- phase2_timer->enableTimer (std::chrono::milliseconds (5 ));
310-
311- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
312- write_events_during = connection->getWriteEventCount ();
313-
314- // Phase 3: After write completes - should have minimal write events again
315- std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
316- write_events_after = connection->getWriteEventCount ();
317-
318- // Cleanup
319- auto cleanup_timer =
320- dispatcher_->createTimer ([this , connection = connection.release ()]() {
321- delete connection;
322- dispatcher_->exit ();
323- });
324- cleanup_timer->enableTimer (std::chrono::milliseconds (10 ));
325- });
306+ };
307+
308+ const auto write_events_before = getWriteEvents ();
309+
310+ writeData (" large data chunk that needs multiple writes" );
311+
312+ ASSERT_TRUE (waitForCondition (
313+ [&]() { return getWriteEvents () > write_events_before; },
314+ std::chrono::milliseconds (500 )));
315+
316+ const auto write_events_during = getWriteEvents ();
326317
327- // Verify write events are managed properly
328- // Should have minimal events when buffer is empty, more when writing
329- EXPECT_LE (write_events_before, 2 ); // Minimal events with empty buffer
330- EXPECT_GT (write_events_during,
331- write_events_before); // More events when writing
332- EXPECT_LE (write_events_after - write_events_during,
333- 2 ); // Back to minimal after write
318+ // Drain client pipe to allow server writes to complete
319+ char drain_buffer[256 ];
320+ while (read (client_fd, drain_buffer, sizeof (drain_buffer)) > 0 ) {
321+ }
322+
323+ ASSERT_TRUE (waitForCondition (
324+ [&]() { return getWriteEvents () >= write_events_during; },
325+ std::chrono::milliseconds (200 )));
326+
327+ const auto write_events_after = getWriteEvents ();
328+
329+ EXPECT_GT (write_events_during, write_events_before);
330+ EXPECT_LE (write_events_after, write_events_during + 10 );
331+
332+ executeInDispatcher ([conn = connection_ptr.load ()]() {
333+ conn->close (network::ConnectionCloseType::FlushWrite);
334+ delete conn;
335+ });
334336}
335337
336338/* *
@@ -406,14 +408,17 @@ TEST_F(EventHandlingTest, BidirectionalCommunication) {
406408 dispatcher_->exit ();
407409 });
408410 timer->enableTimer (std::chrono::milliseconds (100 ));
409- });
411+ });
410412
411413 // Both sides should receive the messages
412- EXPECT_TRUE (client_callbacks.data_received );
413- EXPECT_TRUE (server_callbacks.data_received );
414+ ASSERT_TRUE (waitForCondition (
415+ [&]() { return client_callbacks.data_received .load (); }));
416+ ASSERT_TRUE (waitForCondition (
417+ [&]() { return server_callbacks.data_received .load (); }));
414418 EXPECT_EQ (" Hello from server" , client_callbacks.received_data .toString ());
415419 EXPECT_EQ (" Hello from client" , server_callbacks.received_data .toString ());
416420}
417421
418422} // namespace test
419- } // namespace mcp
423+ } // namespace mcp
424+
0 commit comments