7b1307e2a05b97ba52193574f3e55332cec2da24
[tinc] / test / integration / scripts.py
1 #!/usr/bin/env python3
2
3 """Test that all tincd scripts execute in correct order and contain expected env vars."""
4
5 import os
6 import typing as T
7
8 from testlib import check, path
9 from testlib.log import log
10 from testlib.proc import Tinc, Script, ScriptType, TincScript
11 from testlib.test import Test
12 from testlib.util import random_string
13
14 SUBNET_SERVER = ("10.0.0.1", "fec0::/64")
15 SUBNET_CLIENT = ("10.0.0.2", "fec0::/64#5")
16 NETNAMES = {
17     "server": "net_" + random_string(8),
18     "invite": "net_" + random_string(8),
19     "client": "net_" + random_string(8),
20 }
21
22 # Creation time for the last notification event we've received.
23 # Used for checking that scripts are called in the correct order.
24 # dict is to avoid angering linters by using `global` to update this value.
25 last_time = {"time": -1}
26
27
28 def init(ctx: Test) -> T.Tuple[Tinc, Tinc]:
29     """Initialize new test nodes."""
30     server, client = ctx.node(), ctx.node()
31
32     stdin = f"""
33         init {server}
34         set Port 0
35         set DeviceType dummy
36         set Address 127.0.0.1
37         set AddressFamily ipv4
38         add Subnet {SUBNET_SERVER[0]}
39         add Subnet {SUBNET_SERVER[1]}
40     """
41     server.cmd(stdin=stdin)
42
43     for script in (
44         *Script,
45         server.script_up,
46         server.script_down,
47         client.script_up,
48         client.script_down,
49     ):
50         server.add_script(script)
51
52     return server, client
53
54
55 def wait_script(script: TincScript) -> T.Dict[str, str]:
56     """Wait for script to finish and check that it was run by tincd *after* the
57     script that was used as the argument in the previous call to this function.
58
59     For example, to check that SUBNET_UP is called after TINC_UP:
60         wait_script(node[Script.TINC_UP])
61         wait_script(node[Script.SUBNET_UP])
62     """
63     msg = script.wait()
64     assert msg.created_at
65
66     log.debug(
67         "%s sent %d, prev %d, diff %d",
68         script,
69         msg.created_at,
70         last_time["time"],
71         msg.created_at - last_time["time"],
72     )
73
74     if msg.created_at <= last_time["time"]:
75         raise ValueError(f"script {script} started in wrong order")
76
77     last_time["time"] = msg.created_at
78     return msg.env
79
80
81 def wait_tinc(server: Tinc, script: Script) -> None:
82     """Wait for TINC_UP / TINC_DOWN and check env vars."""
83     log.info("checking tinc: %s %s", server, script)
84
85     env = wait_script(server[script])
86     check.equals(NETNAMES["server"], env["NETNAME"])
87     check.equals(server.name, env["NAME"])
88     check.equals("dummy", env["DEVICE"])
89
90
91 def wait_subnet(server: Tinc, script: Script, node: Tinc, subnet: str) -> None:
92     """Wait for SUBNET_UP / SUBNET_DOWN and check env vars."""
93     log.info("checking subnet: %s %s %s %s", server, script, node, subnet)
94
95     env = wait_script(server[script])
96     check.equals(NETNAMES["server"], env["NETNAME"])
97     check.equals(server.name, env["NAME"])
98     check.equals("dummy", env["DEVICE"])
99     check.equals(node.name, env["NODE"])
100
101     if node != server:
102         check.equals("127.0.0.1", env["REMOTEADDRESS"])
103         check.equals(str(node.port), env["REMOTEPORT"])
104
105     if "#" in subnet:
106         addr, weight = subnet.split("#")
107         check.equals(addr, env["SUBNET"])
108         check.equals(weight, env["WEIGHT"])
109     else:
110         check.equals(subnet, env["SUBNET"])
111
112
113 def wait_host(server: Tinc, client: Tinc, script: ScriptType) -> None:
114     """Wait for HOST_UP / HOST_DOWN and check env vars."""
115     log.info("checking host: %s %s %s", server, client, script)
116
117     env = wait_script(server[script])
118     check.equals(NETNAMES["server"], env["NETNAME"])
119     check.equals(server.name, env["NAME"])
120     check.equals(client.name, env["NODE"])
121     check.equals("dummy", env["DEVICE"])
122     check.equals("127.0.0.1", env["REMOTEADDRESS"])
123     check.equals(str(client.port), env["REMOTEPORT"])
124
125
126 def test_start_server(server: Tinc) -> None:
127     """Start server node and run checks on its scripts."""
128     server.cmd("-n", NETNAMES["server"], "start")
129     wait_tinc(server, Script.TINC_UP)
130
131     port = server.read_port()
132     server.cmd("set", "port", str(port))
133
134     log.info("test server subnet-up")
135     for sub in SUBNET_SERVER:
136         wait_subnet(server, Script.SUBNET_UP, server, sub)
137
138
139 def test_invite_client(server: Tinc, client: Tinc) -> str:
140     """Check that client invitation scripts work."""
141     url, _ = server.cmd("-n", NETNAMES["invite"], "invite", client.name)
142     url = url.strip()
143     check.true(url)
144
145     env = wait_script(server[Script.INVITATION_CREATED])
146     check.equals(NETNAMES["invite"], env["NETNAME"])
147     check.equals(server.name, env["NAME"])
148     check.equals(client.name, env["NODE"])
149     check.equals(url, env["INVITATION_URL"])
150     assert os.path.isfile(env["INVITATION_FILE"])
151
152     return url
153
154
155 def test_join_client(server: Tinc, client: Tinc, url: str) -> None:
156     """Test that client joining scripts work."""
157     client.cmd("-n", NETNAMES["client"], "join", url)
158
159     env = wait_script(server[Script.INVITATION_ACCEPTED])
160     check.equals(NETNAMES["server"], env["NETNAME"])
161     check.equals(server.name, env["NAME"])
162     check.equals(client.name, env["NODE"])
163     check.equals("dummy", env["DEVICE"])
164     check.equals("127.0.0.1", env["REMOTEADDRESS"])
165
166
167 def test_start_client(server: Tinc, client: Tinc) -> None:
168     """Start client and check its script work."""
169     client.randomize_port()
170
171     stdin = f"""
172         set Address {client.address}
173         set ListenAddress {client.address}
174         set Port {client.port}
175         set DeviceType dummy
176         add Subnet {SUBNET_CLIENT[0]}
177         add Subnet {SUBNET_CLIENT[1]}
178         start
179     """
180     client.cmd(stdin=stdin)
181
182     log.info("test client scripts")
183     wait_host(server, client, Script.HOST_UP)
184     wait_host(server, client, client.script_up)
185
186     log.info("test client subnet-up")
187     for sub in SUBNET_CLIENT:
188         wait_subnet(server, Script.SUBNET_UP, client, sub)
189
190
191 def test_stop_server(server: Tinc, client: Tinc) -> None:
192     """Stop server and check that its scripts work."""
193     server.cmd("stop")
194     wait_host(server, client, Script.HOST_DOWN)
195     wait_host(server, client, client.script_down)
196
197     log.info("test client subnet-down")
198     for sub in SUBNET_CLIENT:
199         wait_subnet(server, Script.SUBNET_DOWN, client, sub)
200
201     log.info("test server subnet-down")
202     for sub in SUBNET_SERVER:
203         wait_subnet(server, Script.SUBNET_DOWN, server, sub)
204
205     log.info("test tinc-down")
206     wait_tinc(server, Script.TINC_DOWN)
207
208
209 def run_tests(ctx: Test) -> None:
210     """Run all tests."""
211     server, client = init(ctx)
212
213     log.info("start server")
214     test_start_server(server)
215
216     log.info("invite client")
217     url = test_invite_client(server, client)
218
219     log.info('join client via url "%s"', url)
220     test_join_client(server, client, url)
221
222     log.info("start client")
223     test_start_client(server, client)
224
225     log.info("stop server")
226     test_stop_server(server, client)
227
228
229 def run_script_interpreter_test(ctx: Test) -> None:
230     """Check that tincd scripts run with a custom script interpreter."""
231     foo = ctx.node()
232     stdin = f"""
233         init {foo}
234         set Port 0
235         set DeviceType dummy
236         set ScriptsInterpreter {path.PYTHON_PATH}
237     """
238     foo_up = foo.add_script(Script.TINC_UP)
239     foo.cmd(stdin=stdin)
240
241     foo.cmd("start")
242     foo_up.wait()
243     foo.cmd("stop")
244
245
246 with Test("scripts test") as context:
247     run_tests(context)
248
249 if os.name != "nt":
250     with Test("works with ScriptInterpreter") as context:
251         run_script_interpreter_test(context)