MINIUPNPC = "miniupnpc"
OPENSSL = "openssl"
READLINE = "readline"
- TUNEMU = "tunemu"
SANDBOX = "sandbox"
+ TUNEMU = "tunemu"
UML = "uml"
VDE = "vde"
+ WATCHDOG = "watchdog"
class Tinc:
name: str
address: str
_work_dir: str
- _pid_file: str
+ _pid: T.Optional[int]
_port: T.Optional[int]
_scripts: T.Dict[str, TincScript]
_procs: T.List[subp.Popen]
self.address = addr if addr else _rand_localhost()
self._work_dir = _make_wd(self.name)
os.makedirs(self._work_dir, exist_ok=True)
- self._pid_file = os.path.join(_TEMPDIR, f"tinc_{self.name}")
self._port = None
self._scripts = {}
self._procs = []
@property
def pid_file(self) -> str:
"""Get the path to the pid file."""
- return self._pid_file
+ return os.path.join(_TEMPDIR, f"tinc_{self.name}")
def read_port(self) -> int:
"""Read port used by tincd from its pidfile and update the _port field."""
content = f.read()
log.debug("found data %s", content)
- _, _, _, token, port = content.split()
+ pid, _, _, token, port = content.split()
check.equals("port", token)
self._port = int(port)
+ self._pid = int(pid)
return self._port
@property
assert self._port is not None
return self._port
+ @property
+ def pid(self) -> int:
+ """pid of the main tincd process."""
+ assert self._pid is not None
+ return self._pid
+
def __str__(self) -> str:
return self.name
self.add_script(Script.TINC_UP)
tinc_up = self[Script.TINC_UP]
- self.cmd(*args, "start")
+ self.cmd(*args, "start", "--logfile", self.sub("log"))
tinc_up.wait()
if new_script:
return self._port
def cmd(
- self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
+ self,
+ *args: str,
+ code: T.Optional[int] = 0,
+ stdin: T.Optional[T.AnyStr] = None,
+ timeout: T.Optional[int] = None,
) -> T.Tuple[str, str]:
"""Run command through tinc, writes `stdin` to it (if the argument is not None),
check its return code (if the argument is not None), and return (stdout, stderr).
proc = self.tinc(*args, binary=isinstance(stdin, bytes))
log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
- out, err = proc.communicate(stdin, timeout=60)
+ out, err = proc.communicate(stdin, timeout=60 if timeout is None else timeout)
res = proc.returncode
self._procs.remove(proc)
log.debug('tinc %s: code %d, out "%s", err "%s"', self, res, out, err)