Rewrite integration test suite in Python
[tinc] / test / integration / testlib / util.py
1 """Miscellaneous utility functions."""
2
3 import os
4 import sys
5 import subprocess as subp
6 import random
7 import string
8 import socket
9 import typing as T
10
11 from . import check
12 from .log import log
13 from .const import EXIT_SKIP
14
15 _ALPHA_NUMERIC = string.ascii_lowercase + string.digits
16
17
18 def random_port() -> int:
19     """Return an unused TCP port in the unprivileged range.
20     Note that this function releases the port before returning, and it can be
21     overtaken by something else before you use it.
22     """
23     while True:
24         port = random.randint(1024, 65535)
25         try:
26             with socket.socket() as sock:
27                 sock.bind(("0.0.0.0", port))
28                 sock.listen()
29             return port
30         except OSError as ex:
31             log.debug("could not bind to random port %d", port, exc_info=ex)
32
33
34 def random_string(k: int) -> str:
35     """Generate a random alphanumeric string of length k."""
36     return "".join(random.choices(_ALPHA_NUMERIC, k=k))
37
38
39 def find_line(filename: str, prefix: str) -> str:
40     """Find a line with the prefix in a text file.
41     Check that only one line matches.
42     """
43     with open(filename, "r", encoding="utf-8") as f:
44         keylines = [line for line in f.readlines() if line.startswith(prefix)]
45     check.equals(1, len(keylines))
46     return keylines[0].rstrip()
47
48
49 def require_root() -> None:
50     """Check that test is running with root privileges.
51     Exit with code 77 otherwise.
52     """
53     euid = os.geteuid()
54     if euid:
55         log.info("this test requires root (but running under UID %d)", euid)
56         sys.exit(EXIT_SKIP)
57
58
59 def require_command(*args: str) -> None:
60     """Check that command args runs with exit code 0.
61     Exit with code 77 otherwise.
62     """
63     if subp.run(args, check=False).returncode:
64         log.info('this test requires command "%s" to work', " ".join(args))
65         sys.exit(EXIT_SKIP)
66
67
68 def require_path(path: str) -> None:
69     """Check that path exists in your file system.
70     Exit with code 77 otherwise.
71     """
72     if not os.path.exists(path):
73         log.warning("this test requires path %s to be present", path)
74         sys.exit(EXIT_SKIP)
75
76
77 # Thin wrappers around `with open(...) as f: f.do_something()`
78 # Don't do much, besides saving quite a bit of space because of how frequently they're needed.
79
80
81 def read_text(path: str) -> str:
82     """Return the text contents of a file."""
83     with open(path, encoding="utf-8") as f:
84         return f.read()
85
86
87 def write_text(path: str, text: str) -> str:
88     """Write text to a file, replacing its content. Return the text added."""
89     with open(path, "w", encoding="utf-8") as f:
90         f.write(text)
91     return text
92
93
94 def read_lines(path: str) -> T.List[str]:
95     """Read file as a list of lines."""
96     with open(path, encoding="utf-8") as f:
97         return f.read().splitlines()
98
99
100 def write_lines(path: str, lines: T.List[str]) -> T.List[str]:
101     """Write text lines to a file, replacing it content. Return the line added."""
102     with open(path, "w", encoding="utf-8") as f:
103         f.write(os.linesep.join(lines))
104         f.write(os.linesep)
105     return lines
106
107
108 def append_line(path: str, line: str) -> str:
109     """Append a line to the end of the file. Return the line added."""
110     line = f"{os.linesep}{line}{os.linesep}"
111     with open(path, "a", encoding="utf-8") as f:
112         f.write(line)
113     return line