f0849b954a5c02173caefc9be566b8a631ab57df
[tinc] / test / integration / testlib / util.py
1 """Miscellaneous utility functions."""
2
3 import os
4 import sys
5 import subprocess as subp
6 import random
7 import string
8 import socket
9 import typing as T
10 from pathlib import Path
11
12 from . import check
13 from .log import log
14 from .const import EXIT_SKIP
15
16 _ALPHA_NUMERIC = string.ascii_lowercase + string.digits
17
18
19 def random_port() -> int:
20     """Return an unused TCP port in the unprivileged range.
21     Note that this function releases the port before returning, and it can be
22     overtaken by something else before you use it.
23     """
24     while True:
25         port = random.randint(1024, 65535)
26         try:
27             with socket.socket() as sock:
28                 sock.bind(("0.0.0.0", port))
29                 sock.listen()
30             return port
31         except OSError as ex:
32             log.debug("could not bind to random port %d", port, exc_info=ex)
33
34
35 def remove_file(path: T.Union[str, Path]) -> bool:
36     """Try to remove file without failing if it does not exist."""
37     try:
38         os.remove(path)
39         return True
40     except FileNotFoundError:
41         return False
42
43
44 def random_string(k: int) -> str:
45     """Generate a random alphanumeric string of length k."""
46     return "".join(random.choices(_ALPHA_NUMERIC, k=k))
47
48
49 def find_line(filename: str, prefix: str) -> str:
50     """Find a line with the prefix in a text file.
51     Check that only one line matches.
52     """
53     with open(filename, "r", encoding="utf-8") as f:
54         keylines = [line for line in f.readlines() if line.startswith(prefix)]
55     check.equals(1, len(keylines))
56     return keylines[0].rstrip()
57
58
59 def require_root() -> None:
60     """Check that test is running with root privileges.
61     Exit with code 77 otherwise.
62     """
63     euid = os.geteuid()
64     if euid:
65         log.info("this test requires root (but running under UID %d)", euid)
66         sys.exit(EXIT_SKIP)
67
68
69 def require_command(*args: str) -> None:
70     """Check that command args runs with exit code 0.
71     Exit with code 77 otherwise.
72     """
73     if subp.run(args, check=False).returncode:
74         log.info('this test requires command "%s" to work', " ".join(args))
75         sys.exit(EXIT_SKIP)
76
77
78 def require_path(path: str) -> None:
79     """Check that path exists in your file system.
80     Exit with code 77 otherwise.
81     """
82     if not os.path.exists(path):
83         log.warning("this test requires path %s to be present", path)
84         sys.exit(EXIT_SKIP)
85
86
87 # Thin wrappers around `with open(...) as f: f.do_something()`
88 # Don't do much, besides saving quite a bit of space because of how frequently they're needed.
89
90
91 def read_text(path: str) -> str:
92     """Return the text contents of a file."""
93     with open(path, encoding="utf-8") as f:
94         return f.read()
95
96
97 def write_text(path: str, text: str) -> str:
98     """Write text to a file, replacing its content. Return the text added."""
99     with open(path, "w", encoding="utf-8") as f:
100         f.write(text)
101     return text
102
103
104 def read_lines(path: str) -> T.List[str]:
105     """Read file as a list of lines."""
106     with open(path, encoding="utf-8") as f:
107         return f.read().splitlines()
108
109
110 def write_lines(path: str, lines: T.List[str]) -> T.List[str]:
111     """Write text lines to a file, replacing it content. Return the line added."""
112     with open(path, "w", encoding="utf-8") as f:
113         f.write(os.linesep.join(lines))
114         f.write(os.linesep)
115     return lines
116
117
118 def append_line(path: str, line: str) -> str:
119     """Append a line to the end of the file. Return the line added."""
120     line = f"{os.linesep}{line}{os.linesep}"
121     with open(path, "a", encoding="utf-8") as f:
122         f.write(line)
123     return line