Add tests for some device & address variables
[tinc] / test / integration / device_fd.py
1 #!/usr/bin/env python3
2
3 """Test FD device support."""
4
5 import array
6 import socket
7 import tempfile
8 import threading
9 import time
10
11 from testlib import check
12 from testlib.log import log
13 from testlib.test import Test
14 from testlib.proc import Script
15
16 JUNK_FRAME = b"\xFF" * 80
17
18
19 def start_fd_server(unix: socket.socket, payload: bytes, file_desc: int) -> None:
20     """Start UNIX socket server and then the FD to the first connected client."""
21
22     def send_fd() -> None:
23         conn, _ = unix.accept()
24         with conn:
25             log.info("accepted connection %s", conn)
26             ancillary = array.array("i", [file_desc])
27             conn.sendmsg([payload], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, ancillary)])
28
29     threading.Thread(target=send_fd).start()
30
31
32 def test_device_fd(ctx: Test) -> None:
33     """Test some FD device error conditions."""
34
35     foo = ctx.node(init="set DeviceType fd")
36
37     log.info("test with empty Device")
38     _, err = foo.cmd("start", code=1)
39     check.is_in("Could not read device", err)
40
41     log.info("test with too long UNIX socket path")
42     device = "x" * 110
43     _, err = foo.cmd("start", "-o", f"Device={device}", code=1)
44     check.is_in("Unix socket path too long", err)
45
46     foo.cmd("set", "Device", "/dev/null")
47
48     log.info("check that Mode=switch fails")
49     _, err = foo.cmd("start", "-o", "Mode=switch", code=1)
50     check.is_in("Switch mode not supported", err)
51
52     log.info("test with incorrect Device")
53     _, err = foo.cmd("start", code=1)
54     check.is_in("Receiving fd from Unix socket", err)
55     check.is_in("Could not connect to Unix socket", err)
56
57     log.info("test with invalid FD")
58     _, err = foo.cmd("start", "-o", "Device=-1", code=1)
59     check.is_in("Could not open", err)
60
61     log.info("create a UNIX socket to transfer FD")
62     unix = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
63     unix_path = tempfile.mktemp()
64     unix.bind(unix_path)
65     unix.listen(1)
66
67     foo.cmd("set", "Device", unix_path)
68     myself, him = socket.socketpair(socket.AF_UNIX)
69
70     log.info("start with empty data")
71     start_fd_server(unix, b"", him.fileno())
72     _, err = foo.cmd("start", "-o", f"Device={unix_path}", code=1)
73     check.is_in("Could not read from unix socket", err)
74
75     foo_log = foo.sub("log")
76     foo.add_script(Script.TINC_UP)
77
78     log.info("start with correct amount of data")
79     start_fd_server(unix, b" ", him.fileno())
80
81     log.info("wait for tincd to connect")
82     _, err = foo.cmd("start", "-o", f"Device={unix_path}", "--logfile", foo_log, "-d10")
83     foo[Script.TINC_UP].wait()
84     check.is_in("adapter set up", err)
85
86     log.info("send junk data and make sure tincd receives it")
87     for _ in range(10):
88         myself.send(JUNK_FRAME)
89         time.sleep(0.1)
90
91     foo.add_script(Script.TINC_DOWN)
92     foo.cmd("stop")
93     foo[Script.TINC_DOWN].wait()
94
95     check.in_file(foo_log, "Unknown IP version while reading packet from fd/")
96
97
98 with Test("test FD device") as context:
99     test_device_fd(context)