Add tests for some device & address variables
[tinc] / test / integration / testlib / proc.py
1 """Classes for working with compiled instances of tinc and tincd binaries."""
2
3 import os
4 import random
5 import tempfile
6 import typing as T
7 import subprocess as subp
8 from enum import Enum
9 from platform import system
10
11 from . import check, path
12 from .log import log
13 from .script import TincScript, Script, ScriptType
14 from .template import make_script, make_cmd_wrap
15 from .util import random_string, random_port
16
17 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
18 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
19
20 # Path to the system temporary directory.
21 _TEMPDIR = tempfile.gettempdir()
22
23
24 def _make_wd(name: str) -> str:
25     work_dir = os.path.join(path.TEST_WD, "data", name)
26     os.makedirs(work_dir, exist_ok=True)
27     return work_dir
28
29
30 def _random_octet() -> int:
31     return random.randint(1, 254)
32
33
34 def _rand_localhost() -> str:
35     """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
36     it without additional configuration. For all others, return 127.0.0.1.
37     """
38     if _FULL_LOCALHOST_SUBNET:
39         return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
40     return "127.0.0.1"
41
42
43 class Feature(Enum):
44     """Optional features supported by both tinc and tincd."""
45
46     COMP_LZ4 = "comp_lz4"
47     COMP_LZO = "comp_lzo"
48     COMP_ZLIB = "comp_zlib"
49     CURSES = "curses"
50     JUMBOGRAMS = "jumbograms"
51     LEGACY_PROTOCOL = "legacy_protocol"
52     LIBGCRYPT = "libgcrypt"
53     MINIUPNPC = "miniupnpc"
54     OPENSSL = "openssl"
55     READLINE = "readline"
56     SANDBOX = "sandbox"
57     TUNEMU = "tunemu"
58     UML = "uml"
59     VDE = "vde"
60     WATCHDOG = "watchdog"
61
62
63 class Tinc:
64     """Thin wrapper around Popen that simplifies running tinc/tincd
65     binaries by passing required arguments, checking exit codes, etc.
66     """
67
68     name: str
69     address: str
70     _work_dir: str
71     _pid: T.Optional[int]
72     _port: T.Optional[int]
73     _scripts: T.Dict[str, TincScript]
74     _procs: T.List[subp.Popen]
75
76     def __init__(self, name: str = "", addr: str = "") -> None:
77         self.name = name if name else random_string(10)
78         self.address = addr if addr else _rand_localhost()
79         self._work_dir = _make_wd(self.name)
80         os.makedirs(self._work_dir, exist_ok=True)
81         self._port = None
82         self._scripts = {}
83         self._procs = []
84
85     def randomize_port(self) -> int:
86         """Use a random port for this node."""
87         self._port = random_port()
88         return self._port
89
90     @property
91     def pid_file(self) -> str:
92         """Get the path to the pid file."""
93         return os.path.join(_TEMPDIR, f"tinc_{self.name}")
94
95     def read_port(self) -> int:
96         """Read port used by tincd from its pidfile and update the _port field."""
97         log.debug("reading pidfile at %s", self.pid_file)
98
99         with open(self.pid_file, "r", encoding="utf-8") as f:
100             content = f.read()
101         log.debug("found data %s", content)
102
103         pid, _, _, token, port = content.split()
104         check.equals("port", token)
105
106         self._port = int(port)
107         self._pid = int(pid)
108         return self._port
109
110     @property
111     def port(self) -> int:
112         """Port that tincd is listening on."""
113         assert self._port is not None
114         return self._port
115
116     @property
117     def pid(self) -> int:
118         """pid of the main tincd process."""
119         assert self._pid is not None
120         return self._pid
121
122     def __str__(self) -> str:
123         return self.name
124
125     def __getitem__(self, script: ScriptType) -> TincScript:
126         if isinstance(script, Script):
127             script = script.name
128         return self._scripts[script]
129
130     def __enter__(self):
131         return self
132
133     def __exit__(self, exc_type, exc_val, exc_tb):
134         self.cleanup()
135
136     @property
137     def features(self) -> T.List[Feature]:
138         """List of features supported by tinc and tincd."""
139         tinc, _ = self.cmd("--version")
140         tincd, _ = self.tincd("--version").communicate(timeout=5)
141         prefix, features = "Features: ", []
142
143         for out in tinc, tincd:
144             for line in out.splitlines():
145                 if not line.startswith(prefix):
146                     continue
147                 tokens = line[len(prefix) :].split()
148                 for token in tokens:
149                     features.append(Feature(token))
150                 break
151
152         log.info('supported features: "%s"', features)
153         return features
154
155     @property
156     def _common_args(self) -> T.List[str]:
157         return [
158             "--net",
159             self.name,
160             "--config",
161             self.work_dir,
162             "--pidfile",
163             self.pid_file,
164         ]
165
166     def sub(self, *paths: str) -> str:
167         """Return path to a subdirectory within the working dir for this node."""
168         return os.path.join(self._work_dir, *paths)
169
170     @property
171     def work_dir(self):
172         """Node's working directory."""
173         return self._work_dir
174
175     @property
176     def script_up(self) -> str:
177         """Name of the hosts/XXX-up script for this node."""
178         return f"hosts/{self.name}-up"
179
180     @property
181     def script_down(self) -> str:
182         """Name of the hosts/XXX-down script for this node."""
183         return f"hosts/{self.name}-down"
184
185     def cleanup(self) -> None:
186         """Terminate all tinc and tincd processes started from this instance."""
187         log.info("running node cleanup for %s", self)
188
189         try:
190             self.cmd("stop")
191         except (AssertionError, ValueError):
192             log.info("unsuccessfully tried to stop node %s", self)
193
194         for proc in self._procs:
195             if proc.returncode is not None:
196                 log.debug("PID %d exited, skipping", proc.pid)
197             else:
198                 log.info("PID %d still running, stopping", proc.pid)
199                 try:
200                     proc.kill()
201                 except OSError as ex:
202                     log.error("could not kill PID %d", proc.pid, exc_info=ex)
203
204             log.debug("waiting on %d to prevent zombies", proc.pid)
205             try:
206                 proc.wait()
207             except OSError as ex:
208                 log.error("waiting on %d failed", proc.pid, exc_info=ex)
209
210         self._procs.clear()
211
212     def start(self, *args: str) -> int:
213         """Start the node, wait for it to call tinc-up, and get the port it's
214         listening on from the pid file. Don't use this method unless you need
215         to know the port tincd is running on. Call .cmd("start"), it's faster.
216
217         Reading pidfile and setting the port cannot be done from tinc-up because
218         you can't send tinc commands to yourself there — the daemon doesn't
219         respond to them until tinc-up is finished. The port field on this Tinc
220         instance is updated to reflect the correct port. If tinc-up is missing,
221         this command creates a new one, and then disables it.
222         """
223         new_script = Script.TINC_UP.name not in self._scripts
224         if new_script:
225             self.add_script(Script.TINC_UP)
226
227         tinc_up = self[Script.TINC_UP]
228         self.cmd(*args, "start", "--logfile", self.sub("log"))
229         tinc_up.wait()
230
231         if new_script:
232             tinc_up.disable()
233
234         self._port = self.read_port()
235         self.cmd("set", "Port", str(self._port))
236
237         return self._port
238
239     def cmd(
240         self,
241         *args: str,
242         code: T.Optional[int] = 0,
243         stdin: T.Optional[T.AnyStr] = None,
244         timeout: T.Optional[int] = None,
245     ) -> T.Tuple[str, str]:
246         """Run command through tinc, writes `stdin` to it (if the argument is not None),
247         check its return code (if the argument is not None), and return (stdout, stderr).
248         """
249         proc = self.tinc(*args, binary=isinstance(stdin, bytes))
250         log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
251
252         out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
253         res = proc.returncode
254         self._procs.remove(proc)
255         log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
256
257         if code is not None:
258             check.equals(code, res)
259
260         return out if out else "", err if err else ""
261
262     def tinc(self, *args: str, binary=False) -> subp.Popen:
263         """Start tinc with the specified arguments."""
264         args = tuple(filter(bool, args))
265         cmd = [path.TINC_PATH, *self._common_args, *args]
266         log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
267         # pylint: disable=consider-using-with
268         proc = subp.Popen(
269             cmd,
270             cwd=self._work_dir,
271             stdin=subp.PIPE,
272             stdout=subp.PIPE,
273             stderr=subp.PIPE,
274             encoding=None if binary else "utf-8",
275         )
276         self._procs.append(proc)
277         return proc
278
279     def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
280         """Start tincd with the specified arguments."""
281         args = tuple(filter(bool, args))
282         cmd = [
283             path.TINCD_PATH,
284             *self._common_args,
285             "--logfile",
286             self.sub("log"),
287             "-d5",
288             *args,
289         ]
290         log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
291         if env is not None:
292             env = {**os.environ, **env}
293         # pylint: disable=consider-using-with
294         proc = subp.Popen(
295             cmd,
296             cwd=self._work_dir,
297             stdin=subp.PIPE,
298             stdout=subp.PIPE,
299             stderr=subp.PIPE,
300             encoding="utf-8",
301             env=env,
302         )
303         self._procs.append(proc)
304         return proc
305
306     def add_script(self, script: ScriptType, source: str = "") -> TincScript:
307         """Create a script with the passed Python source code.
308         The source must either be empty, or start indentation with 4 spaces.
309         If the source is empty, the created script can be used to receive notifications.
310         """
311         rel_path = script if isinstance(script, str) else script.value
312         check.not_in(rel_path, self._scripts)
313
314         full_path = os.path.join(self._work_dir, rel_path)
315         tinc_script = TincScript(self.name, rel_path, full_path)
316
317         log.debug("creating script %s at %s", script, full_path)
318         with open(full_path, "w", encoding="utf-8") as f:
319             content = make_script(self.name, rel_path, source)
320             f.write(content)
321
322         if os.name == "nt":
323             log.debug("creating .cmd script wrapper at %s", full_path)
324             win_content = make_cmd_wrap(full_path)
325             with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
326                 f.write(win_content)
327         else:
328             os.chmod(full_path, 0o755)
329
330         if isinstance(script, Script):
331             self._scripts[script.name] = tinc_script
332         self._scripts[rel_path] = tinc_script
333
334         return tinc_script