return self._port
def cmd(
- self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None
+ self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None
) -> T.Tuple[str, str]:
"""Run command through tinc, writes `stdin` to it (if the argument is not None),
check its return code (if the argument is not None), and return (stdout, stderr).
"""
- proc = self.tinc(*args)
+ proc = self.tinc(*args, binary=isinstance(stdin, bytes))
log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code)
out, err = proc.communicate(stdin, timeout=60)
return out if out else "", err if err else ""
- def tinc(self, *args: str) -> subp.Popen:
+ def tinc(self, *args: str, binary=False) -> subp.Popen:
"""Start tinc with the specified arguments."""
args = tuple(filter(bool, args))
cmd = [path.TINC_PATH, *self._common_args, *args]
stdin=subp.PIPE,
stdout=subp.PIPE,
stderr=subp.PIPE,
- encoding="utf-8",
+ encoding=None if binary else "utf-8",
)
self._procs.append(proc)
return proc