1 """Classes for working with compiled instances of tinc and tincd binaries."""
6 import subprocess as subp
8 from platform import system
10 from . import check, path
12 from .script import TincScript, Script, ScriptType
13 from .template import make_script, make_cmd_wrap
14 from .util import random_string, random_port
16 # Does the OS support all addresses in 127.0.0.0/8 without additional configuration?
17 _FULL_LOCALHOST_SUBNET = system() in ("Linux", "Windows")
20 def _make_wd(name: str) -> str:
21 work_dir = os.path.join(path.TEST_WD, "data", name)
22 os.makedirs(work_dir, exist_ok=True)
26 def _random_octet() -> int:
27 return random.randint(1, 254)
30 def _rand_localhost() -> str:
31 """Generate random IP in subnet 127.0.0.0/8 for operating systems that support
32 it without additional configuration. For all others, return 127.0.0.1.
34 if _FULL_LOCALHOST_SUBNET:
35 return f"127.{_random_octet()}.{_random_octet()}.{_random_octet()}"
40 """Optional features supported by both tinc and tincd."""
44 COMP_ZLIB = "comp_zlib"
46 JUMBOGRAMS = "jumbograms"
47 LEGACY_PROTOCOL = "legacy_protocol"
48 LIBGCRYPT = "libgcrypt"
49 MINIUPNPC = "miniupnpc"
58 """Thin wrapper around Popen that simplifies running tinc/tincd
59 binaries by passing required arguments, checking exit codes, etc.
65 _port: T.Optional[int]
66 _scripts: T.Dict[str, TincScript]
67 _procs: T.List[subp.Popen]
69 def __init__(self, name: str = "", addr: str = "") -> None:
70 self.name = name if name else random_string(10)
71 self.address = addr if addr else _rand_localhost()
72 self._work_dir = _make_wd(self.name)
77 def randomize_port(self) -> int:
78 """Use a random port for this node."""
79 self._port = random_port()
82 def read_port(self) -> int:
83 """Read port used by tincd from its pidfile and update the _port field."""
84 pidfile = self.sub("pid")
85 log.debug("reading pidfile at %s", pidfile)
87 with open(pidfile, "r", encoding="utf-8") as f:
89 log.debug("found data %s", content)
91 _, _, _, token, port = content.split()
92 check.equals("port", token)
94 self._port = int(port)
98 def port(self) -> int:
99 """Port that tincd is listening on."""
100 assert self._port is not None
103 def __str__(self) -> str:
106 def __getitem__(self, script: ScriptType) -> TincScript:
107 if isinstance(script, Script):
109 return self._scripts[script]
114 def __exit__(self, exc_type, exc_val, exc_tb):
118 def features(self) -> T.List[Feature]:
119 """List of features supported by tinc and tincd."""
120 tinc, _ = self.cmd("--version")
121 tincd, _ = self.tincd("--version").communicate(timeout=5)
122 prefix, features = "Features: ", []
124 for out in tinc, tincd:
125 for line in out.splitlines():
126 if not line.startswith(prefix):
128 tokens = line[len(prefix) :].split()
130 features.append(Feature(token))
133 log.info('supported features: "%s"', features)
137 def _common_args(self) -> T.List[str]:
147 def sub(self, *paths: str) -> str:
148 """Return path to a subdirectory within the working dir for this node."""
149 return os.path.join(self._work_dir, *paths)
152 def script_up(self) -> str:
153 """Name of the hosts/XXX-up script for this node."""
154 return f"hosts/{self.name}-up"
157 def script_down(self) -> str:
158 """Name of the hosts/XXX-down script for this node."""
159 return f"hosts/{self.name}-down"
161 def cleanup(self) -> None:
162 """Terminate all tinc and tincd processes started from this instance."""
163 log.info("running node cleanup for %s", self)
167 except (AssertionError, ValueError):
168 log.info("unsuccessfully tried to stop node %s", self)
170 for proc in self._procs:
171 if proc.returncode is not None:
172 log.debug("PID %d exited, skipping", proc.pid)
174 log.info("PID %d still running, stopping", proc.pid)
177 except OSError as ex:
178 log.error("could not kill PID %d", proc.pid, exc_info=ex)
180 log.debug("waiting on %d to prevent zombies", proc.pid)
183 except OSError as ex:
184 log.error("waiting on %d failed", proc.pid, exc_info=ex)
188 def start(self, *args: str) -> int:
189 """Start the node, wait for it to call tinc-up, and get the port it's
190 listening on from the pid file. Don't use this method unless you need
191 to know the port tincd is running on. Call .cmd("start"), it's faster.
193 Reading pidfile and setting the port cannot be done from tinc-up because
194 you can't send tinc commands to yourself there — the daemon doesn't
195 respond to them until tinc-up is finished. The port field on this Tinc
196 instance is updated to reflect the correct port. If tinc-up is missing,
197 this command creates a new one, and then disables it.
199 new_script = Script.TINC_UP.name not in self._scripts
201 self.add_script(Script.TINC_UP)
203 tinc_up = self[Script.TINC_UP]
204 self.cmd(*args, "start")
210 self._port = self.read_port()
211 self.cmd("set", "Port", str(self._port))
216 self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None
217 ) -> T.Tuple[str, str]:
218 """Run command through tinc, writes `stdin` to it (if the argument is not None),
219 check its return code (if the argument is not None), and return (stdout, stderr).
221 proc = self.tinc(*args)
222 log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
224 out, err = proc.communicate(stdin, timeout=60)
225 res = proc.returncode
226 self._procs.remove(proc)
227 log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)
230 check.equals(code, res)
232 # Check that port was not used by something else
233 check.not_in("Can't bind to ", err)
235 return out if out else "", err if err else ""
237 def tinc(self, *args: str) -> subp.Popen:
238 """Start tinc with the specified arguments."""
239 args = tuple(filter(bool, args))
240 cmd = [path.TINC_PATH, *self._common_args, *args]
241 log.debug('starting tinc %s: "%s"', self.name, " ".join(cmd))
242 # pylint: disable=consider-using-with
251 self._procs.append(proc)
254 def tincd(self, *args: str, env: T.Optional[T.Dict[str, str]] = None) -> subp.Popen:
255 """Start tincd with the specified arguments."""
256 args = tuple(filter(bool, args))
265 log.debug('starting tincd %s: "%s"', self.name, " ".join(cmd))
267 env = {**os.environ, **env}
268 # pylint: disable=consider-using-with
278 self._procs.append(proc)
281 def add_script(self, script: ScriptType, source: str = "") -> TincScript:
282 """Create a script with the passed Python source code.
283 The source must either be empty, or start indentation with 4 spaces.
284 If the source is empty, the created script can be used to receive notifications.
286 rel_path = script if isinstance(script, str) else script.value
287 check.not_in(rel_path, self._scripts)
289 full_path = os.path.join(self._work_dir, rel_path)
290 tinc_script = TincScript(self.name, rel_path, full_path)
292 log.debug("creating script %s at %s", script, full_path)
293 with open(full_path, "w", encoding="utf-8") as f:
294 content = make_script(self.name, rel_path, source)
298 log.debug("creating .cmd script wrapper at %s", full_path)
299 win_content = make_cmd_wrap(full_path)
300 with open(f"{full_path}.cmd", "w", encoding="utf-8") as f:
303 os.chmod(full_path, 0o755)
305 if isinstance(script, Script):
306 self._scripts[script.name] = tinc_script
307 self._scripts[rel_path] = tinc_script