Add basic pledge/unveil sandbox on OpenBSD
[tinc] / test / integration / security.py
1 #!/usr/bin/env python3
2
3 """Test tinc protocol security."""
4
5 import asyncio
6 import typing as T
7
8 from testlib import check
9 from testlib.log import log
10 from testlib.proc import Tinc, Script
11 from testlib.test import Test
12 from testlib.feature import SANDBOX_LEVEL
13
14 TIMEOUT = 2
15
16
17 async def recv(read: asyncio.StreamReader, out: T.List[bytes]) -> None:
18     """Receive data until connection is closed."""
19     while not read.at_eof():
20         rec = await read.read(1)
21         out.append(rec)
22
23
24 async def send(port: int, buf: str, delay: float = 0) -> bytes:
25     """Send data and receive response."""
26     raw = f"{buf}\n".encode("utf-8")
27     read, write = await asyncio.open_connection(host="localhost", port=port)
28
29     if delay:
30         await asyncio.sleep(delay)
31
32     received: T.List[bytes] = []
33     try:
34         write.write(raw)
35         await asyncio.wait_for(recv(read, received), timeout=1)
36     except asyncio.TimeoutError:
37         log.info('received: "%s"', received)
38         return b"".join(received)
39
40     raise RuntimeError("test should not have reached this line")
41
42
43 async def test_id_timeout(foo: Tinc) -> None:
44     """Test that peer does not send its ID before us."""
45     log.info("no ID sent by peer if we don't send ID before the timeout")
46     data = await send(foo.port, "0 bar 17.7", delay=TIMEOUT * 1.5)
47     check.false(data)
48
49
50 async def test_tarpitted(foo: Tinc) -> None:
51     """Test that peer sends its ID if we send first and are in tarpit."""
52     log.info("ID sent if initiator sends first, but still tarpitted")
53     data = await send(foo.port, "0 bar 17.7")
54     check.has_prefix(data, f"0 {foo} 17.7".encode("utf-8"))
55
56
57 async def test_invalid_id_own(foo: Tinc) -> None:
58     """Test that peer does not accept its own ID."""
59     log.info("own ID not allowed")
60     data = await send(foo.port, f"0 {foo} 17.7")
61     check.false(data)
62
63
64 async def test_invalid_id_unknown(foo: Tinc) -> None:
65     """Test that peer does not accept unknown ID."""
66     log.info("no unknown IDs allowed")
67     data = await send(foo.port, "0 baz 17.7")
68     check.false(data)
69
70
71 async def test_null_metakey(foo: Tinc) -> None:
72     """Test that NULL metakey is not accepted."""
73     null_metakey = f"""
74 0 {foo} 17.0\
75 1 0 672 0 0 834188619F4D943FD0F4B1336F428BD4AC06171FEABA66BD2356BC9593F0ECD643F\
76 0E4B748C670D7750DFDE75DC9F1D8F65AB1026F5ED2A176466FBA4167CC567A2085ABD070C1545B\
77 180BDA86020E275EA9335F509C57786F4ED2378EFFF331869B856DDE1C05C461E4EECAF0E2FB97A\
78 F77B7BC2AD1B34C12992E45F5D1254BBF0C3FB224ABB3E8859594A83B6CA393ED81ECAC9221CE6B\
79 C71A727BCAD87DD80FC0834B87BADB5CB8FD3F08BEF90115A8DF1923D7CD9529729F27E1B8ABD83\
80 C4CF8818AE10257162E0057A658E265610B71F9BA4B365A20C70578FAC65B51B91100392171BA12\
81 A440A5E93C4AA62E0C9B6FC9B68F953514AAA7831B4B2C31C4
82 """.strip()
83
84     log.info("no NULL METAKEY allowed")
85     data = await send(foo.port, null_metakey)
86     check.false(data)
87
88
89 def init(ctx: Test) -> Tinc:
90     """Initialize new test nodes."""
91     foo = ctx.node()
92
93     stdin = f"""
94         init {foo}
95         set Port 0
96         set DeviceType dummy
97         set Address localhost
98         set PingTimeout {TIMEOUT}
99         set AutoConnect no
100         set Subnet 10.96.96.1
101         set Sandbox {SANDBOX_LEVEL}
102     """
103     foo.cmd(stdin=stdin)
104
105     foo.add_script(Script.SUBNET_UP)
106     foo.start()
107     foo[Script.SUBNET_UP].wait()
108
109     return foo
110
111
112 async def run_tests(ctx: Test) -> None:
113     """Run all tests."""
114     foo = init(ctx)
115
116     log.info("getting into tarpit")
117     await test_id_timeout(foo)
118
119     log.info("starting other tests")
120     await asyncio.gather(
121         test_invalid_id_own(foo),
122         test_invalid_id_unknown(foo),
123         test_null_metakey(foo),
124     )
125
126
127 loop = asyncio.get_event_loop()
128
129 with Test("security") as context:
130     loop.run_until_complete(run_tests(context))