From a4570b198d2ba1be392034057fe34578e4d67131 Mon Sep 17 00:00:00 2001 From: Louis-Paul CORDIER Date: Thu, 14 May 2020 14:03:15 +0200 Subject: [PATCH 1/4] monitor: add get_event() method --- tests/monitor.cpp | 47 ++++++++++++++++++++++++++++++++++++++++++++++- zmq.hpp | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 1 deletion(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index bb54ca16..d2707738 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -74,7 +74,7 @@ TEST_CASE("monitor move assign", "[monitor]") } } -TEST_CASE("monitor init event count", "[monitor]") +TEST_CASE("monitor init check event count", "[monitor]") { common_server_client_setup s{false}; mock_monitor_t monitor; @@ -92,6 +92,51 @@ TEST_CASE("monitor init event count", "[monitor]") CHECK(monitor.total == expected_event_count); } +TEST_CASE("monitor init get event count", "[monitor]") +{ + common_server_client_setup s{ false }; + zmq::monitor_t monitor; + + const int expected_event_count = 2; + monitor.init(s.client, "inproc://foo"); + + int total{ 0 }; + int connect_delayed{ 0 }; + int connected{ 0 }; + + auto lbd_count_event = [&](const zmq_event_t& event) { + switch (event.event) + { + case ZMQ_EVENT_CONNECT_DELAYED: + connect_delayed++; + total++; + break; + + case ZMQ_EVENT_CONNECTED: + connected++; + total++; + break; + } + }; + + zmq_event_t eventMsg; + std::string address; + CHECK_FALSE(monitor.get_event(eventMsg, address, zmq::recv_flags::dontwait)); + s.init(); + + while (total < expected_event_count) + { + if (!monitor.get_event(eventMsg, address)) + continue; + + lbd_count_event(eventMsg); + } + + CHECK(connect_delayed == 1); + CHECK(connected == 1); + CHECK(total == expected_event_count); +} + TEST_CASE("monitor init abort", "[monitor]") { class mock_monitor : public mock_monitor_t diff --git a/zmq.hpp b/zmq.hpp index 84457d38..e801b964 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2224,6 +2224,53 @@ class monitor_t on_monitor_started(); } +#if ZMQ_VERSION_MAJOR >= 4 + bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none) + { + assert(_monitor_socket); + + eventMsg.event = 0; + eventMsg.value = 0; + address = std::string(); + + { + message_t msg; + int rc = zmq_msg_recv(msg.handle(), _monitor_socket.handle(), + static_cast(flags)); + + if (rc == -1) + { + if (zmq_errno() == ETERM || zmq_errno() == EAGAIN) + return false; + else + throw error_t(); + } + + const char *data = msg.data(); + memcpy(&eventMsg.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&eventMsg.value, data, sizeof(int32_t)); + } + + message_t addrMsg; + int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), + static_cast(flags)); + + if (rc == -1) + { + if (zmq_errno() == ETERM) + return false; + else + throw error_t(); + } + + const char *str = addrMsg.data(); + address = std::string(str, str + addrMsg.size()); + + return true; + } +#endif + bool check_event(int timeout = 0) { assert(_monitor_socket); From 78dab5da083f2236c846b8c1abf747a97a8d6e41 Mon Sep 17 00:00:00 2001 From: Louis-Paul CORDIER Date: Thu, 14 May 2020 14:07:35 +0200 Subject: [PATCH 2/4] monitor: add compatibility with zmq::poll() --- tests/monitor.cpp | 32 ++++++++++++++++++++++++++++---- zmq.hpp | 8 ++++++++ 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index d2707738..ff7ca24b 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -124,12 +124,36 @@ TEST_CASE("monitor init get event count", "[monitor]") CHECK_FALSE(monitor.get_event(eventMsg, address, zmq::recv_flags::dontwait)); s.init(); - while (total < expected_event_count) + SECTION("get_event") { - if (!monitor.get_event(eventMsg, address)) - continue; + while (total < expected_event_count) + { + if (!monitor.get_event(eventMsg, address)) + continue; + + lbd_count_event(eventMsg); + } + + } + + SECTION("poll get_event") + { + while (total < expected_event_count) + { + zmq::pollitem_t items[] = { + { monitor.handle(), 0, ZMQ_POLLIN, 0 }, + }; + + zmq::poll(&items[0], 1, 100); - lbd_count_event(eventMsg); + if (!(items[0].revents & ZMQ_POLLIN)) { + continue; + } + + CHECK(monitor.get_event(eventMsg, address)); + + lbd_count_event(eventMsg); + } } CHECK(connect_delayed == 1); diff --git a/zmq.hpp b/zmq.hpp index e801b964..1ebe3246 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2224,6 +2224,14 @@ class monitor_t on_monitor_started(); } + operator void *() ZMQ_NOTHROW { return handle(); } + + operator void const *() const ZMQ_NOTHROW { return handle(); } + + ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _monitor_socket.handle(); } + + ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _monitor_socket.handle(); } + #if ZMQ_VERSION_MAJOR >= 4 bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none) { From 3aa80d028c7b5a2f6c69ad110c35b02b1460ec92 Mon Sep 17 00:00:00 2001 From: Louis-Paul CORDIER Date: Thu, 14 May 2020 14:12:40 +0200 Subject: [PATCH 3/4] monitor: add compatibility with zmq::poller_t --- tests/monitor.cpp | 18 ++++++++++++++++++ zmq.hpp | 2 ++ 2 files changed, 20 insertions(+) diff --git a/tests/monitor.cpp b/tests/monitor.cpp index ff7ca24b..dc06670a 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -156,6 +156,24 @@ TEST_CASE("monitor init get event count", "[monitor]") } } + SECTION("poller_t get_event") + { + zmq::poller_t<> poller; + CHECK_NOTHROW(poller.add(monitor, zmq::event_flags::pollin)); + + while (total < expected_event_count) + { + std::vector> events(1); + if(0 == poller.wait_all(events, std::chrono::milliseconds{ 100 })) + continue; + + CHECK(zmq::event_flags::pollin == events[0].events); + CHECK(monitor.get_event(eventMsg, address)); + + lbd_count_event(eventMsg); + } + } + CHECK(connect_delayed == 1); CHECK(connected == 1); CHECK(total == expected_event_count); diff --git a/zmq.hpp b/zmq.hpp index 1ebe3246..44bfd215 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2232,6 +2232,8 @@ class monitor_t ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _monitor_socket.handle(); } + operator socket_ref() ZMQ_NOTHROW { return (zmq::socket_ref) _monitor_socket; } + #if ZMQ_VERSION_MAJOR >= 4 bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none) { From c2298e8dd4e7e23f880c7a7884edcd85a82e7486 Mon Sep 17 00:00:00 2001 From: Louis-Paul CORDIER Date: Thu, 14 May 2020 14:23:24 +0200 Subject: [PATCH 4/4] monitor: expose close() method to public --- zmq.hpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/zmq.hpp b/zmq.hpp index 44bfd215..82dec4ea 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2414,6 +2414,15 @@ class monitor_t _socket = socket_ref(); } #endif + + void close() ZMQ_NOTHROW + { +#ifdef ZMQ_EVENT_MONITOR_STOPPED + abort(); +#endif + _monitor_socket = socket_t(); + } + virtual void on_monitor_started() {} virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) { @@ -2518,13 +2527,6 @@ class monitor_t socket_ref _socket; socket_t _monitor_socket; - - void close() ZMQ_NOTHROW - { - if (_socket) - zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); - _monitor_socket.close(); - } }; #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)