Handle multicast being blocked in the test suite
[tinc] / test / integration / device_multicast.py
index 1a0d55a..1b46c42 100755 (executable)
@@ -2,7 +2,9 @@
 
 """Test multicast device."""
 
+import enum
 import os
+import select
 import socket
 import struct
 import time
@@ -16,7 +18,13 @@ MCAST_ADDR = "224.15.98.12"
 PORT = 38245
 
 
-def multicast_works() -> bool:
+class MulticastSupport(enum.Enum):
+    NO = 0
+    BLOCKED = 1
+    YES = 2
+
+
+def multicast_works() -> MulticastSupport:
     """Check if multicast is supported and works."""
 
     msg = b"foobar"
@@ -31,9 +39,17 @@ def multicast_works() -> bool:
             with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as client:
                 client.sendto(msg, (MCAST_ADDR, PORT))
 
-            return msg == server.recv(16)
+            try:
+                select.select([server], [], [], 1)
+                if msg == server.recv(16, socket.MSG_DONTWAIT):
+                    return MulticastSupport.YES
+            except OSError:
+                pass
+
+            return MulticastSupport.BLOCKED
     except OSError:
-        return False
+        log.info("no")
+        return MulticastSupport.NO
 
 
 def test_no_mcast_support(foo: Tinc) -> None:
@@ -82,9 +98,15 @@ def test_device_multicast(ctx: Test) -> None:
     foo.cmd("set", "Device", f"{MCAST_ADDR} {PORT}")
     foo.cmd("set", "LogLevel", "10")
 
-    if multicast_works():
+    multicast_support = multicast_works()
+    if multicast_support == MulticastSupport.YES:
+        log.info("multicast supported")
         test_rx_tx(foo)
+    elif multicast_support == MulticastSupport.BLOCKED:
+        log.info("multicast blocked")
+        pass
     else:
+        log.info("multicast not supported")
         test_no_mcast_support(foo)