From: Kirill Isakov Date: Fri, 27 May 2022 11:22:05 +0000 (+0600) Subject: Add tests for sign/verify commands X-Git-Url: https://www.tinc-vpn.org/git/?a=commitdiff_plain;h=38be602754aab531c47bc1b04e71952dfa8abcf5;p=tinc Add tests for sign/verify commands --- diff --git a/test/integration/cmd_fsck.py b/test/integration/cmd_fsck.py new file mode 100755 index 00000000..4e79ccd5 --- /dev/null +++ b/test/integration/cmd_fsck.py @@ -0,0 +1,378 @@ +#!/usr/bin/env python3 + +"""Test 'tinc fsck' command.""" + +import os +import sys +import typing as T + +from testlib import check +from testlib.log import log +from testlib.proc import Tinc, Feature +from testlib.util import read_text, read_lines, write_lines, append_line, write_text + +run_legacy_checks = Feature.LEGACY_PROTOCOL in Tinc().features +run_access_checks = os.name != "nt" and os.geteuid() != 0 +run_executability_checks = os.name != "nt" +run_permission_checks = run_executability_checks + +# Sample RSA key pair (old format). Uses e = 0xFFFF. +RSA_N = """ +BB82C3A9B906E98ABF2D99FF9B320B229F5C1E58EC784762DA1F4D3509FFF78ECA7FFF19BA17073\ +6CDE458EC8E732DDE2C02009632DF731B4A6BD6C504E50B7B875484506AC1E49FD0DF624F6612F5\ +64C562BD20F870592A49195023D744963229C35081C8AE48BE2EBB5CC9A0D64924022DC0EB782A3\ +A8F3EABCA04AA42B24B2A6BD2353A6893A73AE01FA54891DD24BF36CA032F19F7E78C01273334BA\ +A2ECF36B6998754CB012BC985C975503D945E4D925F6F719ACC8FBA7B18C810FF850C3CCACD6056\ +5D4FCFE02A98FE793E2D45D481A34D1F90584D096561FF3184C462C606535F3F9BB260541DF0D1F\ +EB16938FFDEC2FF96ACCC6BD5BFBC19471F6AB +""".strip() + +RSA_D = """ +8CEC9A4316FE45E07900197D8FBB52D3AF01A51C4F8BD08A1E21A662E3CFCF7792AD7680673817B\ +70AC1888A08B49E8C5835357016D9BF56A0EBDE8B5DF214EC422809BC8D88177F273419116EF2EC\ +7951453F129768DE9BC31D963515CC7481559E4C0E65C549169F2B94AE68DB944171189DD654DC6\ +970F2F5843FB7C8E9D057E2B5716752F1F5686811AC075ED3D3CBD06B5D35AE33D01260D9E0560A\ +F545D0C9D89A31D5EAF96D5422F6567FE8A90E23906B840545805644DFD656E526A686D3B978DD2\ +71578CA3DA0F7D23FC1252A702A5D597CAE9D4A5BBF6398A75AF72582C7538A7937FB71A2610DCB\ +C39625B77103FA3B7D0A55177FD98C39CD4A27 +""".strip() + + +class Context: + """Test context. Used to store paths to configuration files.""" + + def __init__(self) -> None: + node = Tinc() + node.cmd("init", node.name) + + self.node = node + self.host = node.sub("hosts", node.name) + self.conf = node.sub("tinc.conf") + self.rsa_priv = node.sub("rsa_key.priv") + self.ec_priv = node.sub("ed25519_key.priv") + self.tinc_up = node.sub("tinc-up") + self.host_up = node.sub("host-up") + + if os.name == "nt": + self.tinc_up = f"{self.tinc_up}.cmd" + self.host_up = f"{self.host_up}.cmd" + + def expect_msg( + self, msg: str, force: bool = False, code: int = 1, present: bool = True + ) -> None: + """Checks that tinc output contains (or does not contain) the expected message.""" + args = ["fsck"] + if force: + args.insert(0, "--force") + + out, err = self.node.cmd(*args, code=code) + if present: + check.is_in(msg, out, err) + else: + check.not_in(msg, out, err) + + +def test(msg: str) -> Context: + """Create test context.""" + context = Context() + log.info("TEST: %s", msg) + return context + + +def remove_pem(config: str) -> T.List[str]: + """Remove PEM from a config file, leaving everything else untouched.""" + key, result = False, [] + for line in read_lines(config): + if line.startswith("-----BEGIN"): + key = True + continue + if line.startswith("-----END"): + key = False + continue + if not key: + result.append(line) + write_lines(config, result) + return result + + +def extract_pem(config: str) -> T.List[str]: + """Extract PEM from a config file, ignoring everything else.""" + key = False + result: T.List[str] = [] + for line in read_lines(config): + if line.startswith("-----BEGIN"): + key = True + continue + if line.startswith("-----END"): + return result + if key: + result.append(line) + raise Exception("key not found") + + +def replace_line(file_path: str, prefix: str, replace: str = "") -> None: + """Replace lines in a file that start with the prefix.""" + lines = read_lines(file_path) + lines = [replace if line.startswith(prefix) else line for line in lines] + write_lines(file_path, lines) + + +def test_private_key_var(var: str, file: str) -> None: + """Test inline private keys with variable var.""" + context = test(f"private key variable {var} in file {file}") + renamed = os.path.realpath(context.node.sub("renamed_key")) + os.rename(src=context.node.sub(file), dst=renamed) + append_line(context.host, f"{var} = {renamed}") + context.expect_msg("key was found but no private key", present=False, code=0) + + +def test_private_keys(keyfile: str) -> None: + """Test private keys in file keyfile.""" + context = test(f"fail on broken {keyfile}") + keyfile_path = context.node.sub(keyfile) + os.truncate(keyfile_path, 0) + + if run_legacy_checks: + context.expect_msg("no private key is known", code=0) + else: + context.expect_msg("No Ed25519 private key found") + + if run_access_checks: + context = test(f"fail on inaccessible {keyfile}") + keyfile_path = context.node.sub(keyfile) + os.chmod(keyfile_path, 0) + context.expect_msg("Error reading", code=0 if run_legacy_checks else 1) + + if run_permission_checks: + context = test(f"warn about unsafe permissions on {keyfile}") + keyfile_path = context.node.sub(keyfile) + os.chmod(keyfile_path, 0o666) + context.expect_msg("unsafe file permissions", code=0) + + if run_legacy_checks: + context = test(f"pass on missing {keyfile} when the other key is present") + keyfile_path = context.node.sub(keyfile) + os.remove(keyfile_path) + context.node.cmd("fsck") + + +def test_ec_public_key_file_var(context: Context, *paths: str) -> None: + """Test EC public keys in config *paths.""" + ec_pubkey = os.path.realpath(context.node.sub("ec_pubkey")) + + ec_key = "" + for line in read_lines(context.host): + if line.startswith("Ed25519PublicKey"): + _, _, ec_key = line.split() + break + assert ec_key + + pem = f""" +-----BEGIN ED25519 PUBLIC KEY----- +{ec_key} +-----END ED25519 PUBLIC KEY----- +""" + write_text(ec_pubkey, pem) + + replace_line(context.host, "Ed25519PublicKey") + + config = context.node.sub(*paths) + append_line(config, f"Ed25519PublicKeyFile = {ec_pubkey}") + + context.expect_msg("No (usable) public Ed25519", code=0, present=False) + + +############################################################################### +# Common tests +############################################################################### + +ctx = test("pass freshly created configuration") +ctx.node.cmd("fsck") + +ctx = test("fail on missing tinc.conf") +os.remove(ctx.conf) +ctx.expect_msg("No tinc configuration found") + +for suffix in "up", "down": + ctx = test(f"unknown -{suffix} script warning") + fake_path = ctx.node.sub(f"fake-{suffix}") + write_text(fake_path, "") + ctx.expect_msg("Unknown script", code=0) + +ctx = test("fix broken Ed25519 public key with --force") +replace_line(ctx.host, "Ed25519PublicKey", "Ed25519PublicKey = foobar") +ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0) +ctx.node.cmd("fsck") + +ctx = test("fix missing Ed25519 public key with --force") +replace_line(ctx.host, "Ed25519PublicKey") +ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0) +ctx.node.cmd("fsck") + +ctx = test("fail when all private keys are missing") +os.remove(ctx.ec_priv) +if run_legacy_checks: + os.remove(ctx.rsa_priv) + ctx.expect_msg("Neither RSA or Ed25519 private") +else: + ctx.expect_msg("No Ed25519 private") + +ctx = test("warn about missing EC public key and NOT fix without --force") +replace_line(ctx.host, "Ed25519PublicKey") +ctx.expect_msg("No (usable) public Ed25519", code=0) +host = read_text(ctx.host) +check.not_in("ED25519 PUBLIC KEY", host) + +ctx = test("fix missing EC public key on --force") +replace_line(ctx.host, "Ed25519PublicKey") +ctx.expect_msg("Wrote Ed25519 public key", force=True, code=0) +host = read_text(ctx.host) +check.is_in("ED25519 PUBLIC KEY", host) + +ctx = test("warn about obsolete variables") +append_line(ctx.host, "GraphDumpFile = /dev/null") +ctx.expect_msg("obsolete variable GraphDumpFile", code=0) + +ctx = test("warn about missing values") +append_line(ctx.host, "Weight = ") +ctx.expect_msg("No value for variable `Weight") + +ctx = test("warn about duplicate variables") +append_line(ctx.host, f"Weight = 0{os.linesep}Weight = 1") +ctx.expect_msg("multiple instances of variable Weight", code=0) + +ctx = test("warn about server variables in host config") +append_line(ctx.host, "Interface = fake0") +ctx.expect_msg("server variable Interface found", code=0) + +ctx = test("warn about host variables in server config") +append_line(ctx.conf, "Port = 1337") +ctx.expect_msg("host variable Port found", code=0) + +ctx = test("warn about missing Name") +replace_line(ctx.conf, "Name =") +ctx.expect_msg("without a valid Name") + +test_private_keys("ed25519_key.priv") +test_private_key_var("Ed25519PrivateKeyFile", "ed25519_key.priv") + +ctx = test("test EC public key in tinc.conf") +test_ec_public_key_file_var(ctx, "tinc.conf") + +ctx = test("test EC public key in hosts/") +test_ec_public_key_file_var(ctx, "hosts", ctx.node.name) + +if run_access_checks: + ctx = test("fail on inaccessible tinc.conf") + os.chmod(ctx.conf, 0) + ctx.expect_msg("not running tinc as root") + + ctx = test("fail on inaccessible hosts/foo") + os.chmod(ctx.host, 0) + ctx.expect_msg("Cannot open config file") + +if run_executability_checks: + ctx = test("non-executable tinc-up MUST be fixed by tinc --force") + os.chmod(ctx.tinc_up, 0o644) + ctx.expect_msg("cannot read and execute", force=True, code=0) + assert os.access(ctx.tinc_up, os.X_OK) + + ctx = test("non-executable tinc-up MUST NOT be fixed by tinc without --force") + os.chmod(ctx.tinc_up, 0o644) + ctx.expect_msg("cannot read and execute", code=0) + assert not os.access(ctx.tinc_up, os.X_OK) + + ctx = test("non-executable foo-up MUST be fixed by tinc --force") + write_text(ctx.host_up, "") + os.chmod(ctx.host_up, 0o644) + ctx.expect_msg("cannot read and execute", force=True, code=0) + assert os.access(ctx.tinc_up, os.X_OK) + + ctx = test("non-executable bar-up MUST NOT be fixed by tinc") + path = ctx.node.sub("hosts", "bar-up") + write_text(path, "") + os.chmod(path, 0o644) + ctx.expect_msg("cannot read and execute", code=0) + assert not os.access(path, os.X_OK) + +############################################################################### +# Legacy protocol +############################################################################### +if not run_legacy_checks: + log.info("skipping legacy protocol tests") + sys.exit(0) + + +def test_rsa_public_key_file_var(context: Context, *paths: str) -> None: + """Test RSA public keys in config *paths.""" + key = extract_pem(context.host) + remove_pem(context.host) + + rsa_pub = os.path.realpath(context.node.sub("rsa_pubkey")) + write_lines(rsa_pub, key) + + config = context.node.sub(*paths) + append_line(config, f"PublicKeyFile = {rsa_pub}") + + context.expect_msg("Error reading RSA public key", code=0, present=False) + + +test_private_keys("rsa_key.priv") +test_private_key_var("PrivateKeyFile", "rsa_key.priv") + +ctx = test("test rsa public key in tinc.conf") +test_rsa_public_key_file_var(ctx, "tinc.conf") + +ctx = test("test rsa public key in hosts/") +test_rsa_public_key_file_var(ctx, "hosts", ctx.node.name) + +ctx = test("warn about missing RSA private key if public key is present") +os.remove(ctx.rsa_priv) +ctx.expect_msg("public RSA key was found but no private key", code=0) + +ctx = test("warn about missing RSA public key") +remove_pem(ctx.host) +ctx.expect_msg("No (usable) public RSA", code=0) +check.not_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host)) + +ctx = test("fix missing RSA public key on --force") +remove_pem(ctx.host) +ctx.expect_msg("Wrote RSA public key", force=True, code=0) +check.is_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host)) + +ctx = test("RSA PublicKey + PrivateKey must work") +os.remove(ctx.rsa_priv) +remove_pem(ctx.host) +append_line(ctx.conf, f"PrivateKey = {RSA_D}") +append_line(ctx.host, f"PublicKey = {RSA_N}") +ctx.expect_msg("no (usable) public RSA", code=0, present=False) + +ctx = test("RSA PrivateKey without PublicKey must warn") +os.remove(ctx.rsa_priv) +remove_pem(ctx.host) +append_line(ctx.conf, f"PrivateKey = {RSA_D}") +ctx.expect_msg("PrivateKey used but no PublicKey found", code=0) + +ctx = test("warn about missing EC private key if public key is present") +os.remove(ctx.ec_priv) +ctx.expect_msg("public Ed25519 key was found but no private key", code=0) + +ctx = test("fix broken RSA public key with --force") +host_lines = read_lines(ctx.host) +del host_lines[1] +write_lines(ctx.host, host_lines) +ctx.expect_msg("old key(s) found and disabled", force=True, code=0) +ctx.node.cmd("fsck") + +ctx = test("fix missing RSA public key with --force") +remove_pem(ctx.host) +ctx.expect_msg("No (usable) public RSA key found", force=True, code=0) +ctx.node.cmd("fsck") + +if run_permission_checks: + ctx = test("warn about unsafe permissions on tinc.conf with PrivateKey") + os.remove(ctx.rsa_priv) + append_line(ctx.conf, f"PrivateKey = {RSA_D}") + append_line(ctx.host, f"PublicKey = {RSA_N}") + os.chmod(ctx.conf, 0o666) + ctx.expect_msg("unsafe file permissions", code=0) diff --git a/test/integration/cmd_sign_verify.py b/test/integration/cmd_sign_verify.py new file mode 100755 index 00000000..eefa3eeb --- /dev/null +++ b/test/integration/cmd_sign_verify.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 + +"""Test sign/verify commands.""" + +import os +import tempfile + +from testlib import util, cmd, check +from testlib.proc import Tinc +from testlib.test import Test + +PRIV_KEY = """ +-----BEGIN ED25519 PRIVATE KEY----- +4Q8bJqfN60s0tOiZdAhAWLgB9+o947cta2WMXmQIz8mCdBdcphzhp23Wt2vUzfQ6 +XHt9+5IqidIw/lLXG61Nbc6IZ+4Fy1XOO1uJ6j4hqIKjdSytD2Vb7MPlNJfPdCDu +-----END ED25519 PRIVATE KEY----- +""" + +HOST = """ +Ed25519PublicKey = nOSmPehc9ljTtbi+IeoKiyYnkc7gd12OzTZTy3TnwgL +Port = 17879 +""" + +# Do not replace \n or this will break on Windows if cloned with native line endings +SIGNED_BYTES = """Signature = foo 1653397516 \ +T8Bjg7dc7IjsCrZQC/20qLRsWPlrbthnjyDHQM0BMLoTeAHbLt0fxP5CbTy7Cifgg7P0K179GeahBFsnaIr4MA\n\ +fake testing data\n\ +hello there\n\ +""".encode( + "utf-8" +) + +RAW_DATA = tempfile.mktemp() + +with open(RAW_DATA, "wb") as raw_file: + raw_file.write(util.random_string(64).encode("utf-8")) + + +def init(ctx: Test) -> Tinc: + """Initialize a node.""" + + foo = ctx.node() + stdin = f"init {foo}" + foo.cmd(stdin=stdin) + return foo + + +def test_sign_errors(foo: Tinc) -> None: + """Test `sign` error conditions.""" + + _, err = foo.cmd("sign", "foo", "bar", code=1) + check.is_in("Too many arguments", err) + + _, err = foo.cmd("sign", "/nonexistent", code=1) + check.is_in("Could not open", err) + + os.truncate(foo.sub("ed25519_key.priv"), 0) + _, err = foo.cmd("sign", RAW_DATA, code=1) + check.is_in("Could not read private key from", err) + + os.remove(foo.sub("ed25519_key.priv")) + _, err = foo.cmd("sign", RAW_DATA, code=1) + check.is_in("Could not open", err) + + +def test_verify(foo: Tinc) -> None: + """Test `verify` of data known to work.""" + + signed_file = tempfile.mktemp() + with open(signed_file, "wb") as f: + f.write(SIGNED_BYTES) + + foo.name = "foo" + util.write_text(foo.sub("tinc.conf"), f"Name = {foo}") + util.write_text(foo.sub(f"hosts/{foo}"), HOST) + util.write_text(foo.sub("ed25519_key.priv"), PRIV_KEY) + + for name in ".", foo.name: + foo.cmd("verify", name, stdin=SIGNED_BYTES) + foo.cmd("verify", name, signed_file) + + if os.name != "nt": + foo.cmd("verify", "*", stdin=SIGNED_BYTES) + foo.cmd("verify", "*", signed_file) + + os.remove(signed_file) + + +def test_verify_errors(foo: Tinc) -> None: + """Test `verify` error conditions.""" + + _, err = foo.cmd("verify", code=1) + check.is_in("Not enough arguments", err) + + _, err = foo.cmd("verify", foo.name, "bar", "baz", code=1) + check.is_in("Too many arguments", err) + + _, err = foo.cmd("verify", "foo@", code=1) + check.is_in("Invalid node name", err) + + _, err = foo.cmd("verify", foo.name, "/nonexistent", code=1) + check.is_in("Could not open", err) + + _, err = foo.cmd("verify", foo.name, stdin="", code=1) + check.is_in("Invalid input", err) + + _, err = foo.cmd("verify", foo.name, stdin="Signature = foo bar baz", code=1) + check.is_in("Invalid input", err) + + sig = ( + "Signature = dog " + "1653395565 " + "D25ACFD89jaV9+6g9TNMDTDxH8JGd3wLMv/YNMwXbrj9Bos9q6IW/tuFPxGxYNQ6qAc93XFzkH5u7Gw+Z86GDA\n" + ) + _, err = foo.cmd("verify", foo.name, stdin=sig, code=1) + check.is_in(f"Signature is not made by {foo}", err) + + sig = ( + f"Signature = {foo} " + "1653395565 " + "D25ACFD89jaV9+6g9TNMDTDxH8JGd3wLMv/YNMwXbrj9Bos9q6IW/tuFPxGxYNQ6qAc93XFzkH5u7Gw+Z86GDA\n" + ) + _, err = foo.cmd("verify", foo.name, stdin=sig, code=1) + check.is_in("Invalid signature", err) + + util.write_text(foo.sub(f"hosts/{foo}"), "foobar") + _, err = foo.cmd("verify", foo.name, stdin=sig, code=1) + check.is_in("Could not read public key from", err) + + +def test_sign_verify(foo: Tinc, bar: Tinc) -> None: + """Test `sign` and pass its result to `verify`.""" + + signed, _ = foo.cmd("sign", RAW_DATA, stdin=b"") + assert isinstance(signed, bytes) + + signed_file = tempfile.mktemp() + with open(signed_file, "wb") as f: + f.write(signed) + + for name in ".", foo.name: + foo.cmd("verify", name, signed_file) + foo.cmd("verify", name, stdin=signed) + + if os.name != "nt": + foo.cmd("verify", "*", signed_file) + foo.cmd("verify", "*", stdin=signed) + + os.remove(signed_file) + + cmd.exchange(foo, bar) + + if os.name != "nt": + signed, _ = foo.cmd("sign", RAW_DATA) + bar.cmd("verify", "*", stdin=signed) + + signed, _ = bar.cmd("sign", RAW_DATA) + foo.cmd("verify", bar.name, stdin=signed) + + +with Test("test errors in `sign`") as context: + test_sign_errors(init(context)) + +with Test("test errors in `verify`") as context: + test_verify_errors(init(context)) + +with Test("test successful `verify`") as context: + test_verify(init(context)) + +with Test("test `sign` and `verify`") as context: + test_sign_verify(init(context), init(context)) diff --git a/test/integration/command_fsck.py b/test/integration/command_fsck.py deleted file mode 100755 index 4e79ccd5..00000000 --- a/test/integration/command_fsck.py +++ /dev/null @@ -1,378 +0,0 @@ -#!/usr/bin/env python3 - -"""Test 'tinc fsck' command.""" - -import os -import sys -import typing as T - -from testlib import check -from testlib.log import log -from testlib.proc import Tinc, Feature -from testlib.util import read_text, read_lines, write_lines, append_line, write_text - -run_legacy_checks = Feature.LEGACY_PROTOCOL in Tinc().features -run_access_checks = os.name != "nt" and os.geteuid() != 0 -run_executability_checks = os.name != "nt" -run_permission_checks = run_executability_checks - -# Sample RSA key pair (old format). Uses e = 0xFFFF. -RSA_N = """ -BB82C3A9B906E98ABF2D99FF9B320B229F5C1E58EC784762DA1F4D3509FFF78ECA7FFF19BA17073\ -6CDE458EC8E732DDE2C02009632DF731B4A6BD6C504E50B7B875484506AC1E49FD0DF624F6612F5\ -64C562BD20F870592A49195023D744963229C35081C8AE48BE2EBB5CC9A0D64924022DC0EB782A3\ -A8F3EABCA04AA42B24B2A6BD2353A6893A73AE01FA54891DD24BF36CA032F19F7E78C01273334BA\ -A2ECF36B6998754CB012BC985C975503D945E4D925F6F719ACC8FBA7B18C810FF850C3CCACD6056\ -5D4FCFE02A98FE793E2D45D481A34D1F90584D096561FF3184C462C606535F3F9BB260541DF0D1F\ -EB16938FFDEC2FF96ACCC6BD5BFBC19471F6AB -""".strip() - -RSA_D = """ -8CEC9A4316FE45E07900197D8FBB52D3AF01A51C4F8BD08A1E21A662E3CFCF7792AD7680673817B\ -70AC1888A08B49E8C5835357016D9BF56A0EBDE8B5DF214EC422809BC8D88177F273419116EF2EC\ -7951453F129768DE9BC31D963515CC7481559E4C0E65C549169F2B94AE68DB944171189DD654DC6\ -970F2F5843FB7C8E9D057E2B5716752F1F5686811AC075ED3D3CBD06B5D35AE33D01260D9E0560A\ -F545D0C9D89A31D5EAF96D5422F6567FE8A90E23906B840545805644DFD656E526A686D3B978DD2\ -71578CA3DA0F7D23FC1252A702A5D597CAE9D4A5BBF6398A75AF72582C7538A7937FB71A2610DCB\ -C39625B77103FA3B7D0A55177FD98C39CD4A27 -""".strip() - - -class Context: - """Test context. Used to store paths to configuration files.""" - - def __init__(self) -> None: - node = Tinc() - node.cmd("init", node.name) - - self.node = node - self.host = node.sub("hosts", node.name) - self.conf = node.sub("tinc.conf") - self.rsa_priv = node.sub("rsa_key.priv") - self.ec_priv = node.sub("ed25519_key.priv") - self.tinc_up = node.sub("tinc-up") - self.host_up = node.sub("host-up") - - if os.name == "nt": - self.tinc_up = f"{self.tinc_up}.cmd" - self.host_up = f"{self.host_up}.cmd" - - def expect_msg( - self, msg: str, force: bool = False, code: int = 1, present: bool = True - ) -> None: - """Checks that tinc output contains (or does not contain) the expected message.""" - args = ["fsck"] - if force: - args.insert(0, "--force") - - out, err = self.node.cmd(*args, code=code) - if present: - check.is_in(msg, out, err) - else: - check.not_in(msg, out, err) - - -def test(msg: str) -> Context: - """Create test context.""" - context = Context() - log.info("TEST: %s", msg) - return context - - -def remove_pem(config: str) -> T.List[str]: - """Remove PEM from a config file, leaving everything else untouched.""" - key, result = False, [] - for line in read_lines(config): - if line.startswith("-----BEGIN"): - key = True - continue - if line.startswith("-----END"): - key = False - continue - if not key: - result.append(line) - write_lines(config, result) - return result - - -def extract_pem(config: str) -> T.List[str]: - """Extract PEM from a config file, ignoring everything else.""" - key = False - result: T.List[str] = [] - for line in read_lines(config): - if line.startswith("-----BEGIN"): - key = True - continue - if line.startswith("-----END"): - return result - if key: - result.append(line) - raise Exception("key not found") - - -def replace_line(file_path: str, prefix: str, replace: str = "") -> None: - """Replace lines in a file that start with the prefix.""" - lines = read_lines(file_path) - lines = [replace if line.startswith(prefix) else line for line in lines] - write_lines(file_path, lines) - - -def test_private_key_var(var: str, file: str) -> None: - """Test inline private keys with variable var.""" - context = test(f"private key variable {var} in file {file}") - renamed = os.path.realpath(context.node.sub("renamed_key")) - os.rename(src=context.node.sub(file), dst=renamed) - append_line(context.host, f"{var} = {renamed}") - context.expect_msg("key was found but no private key", present=False, code=0) - - -def test_private_keys(keyfile: str) -> None: - """Test private keys in file keyfile.""" - context = test(f"fail on broken {keyfile}") - keyfile_path = context.node.sub(keyfile) - os.truncate(keyfile_path, 0) - - if run_legacy_checks: - context.expect_msg("no private key is known", code=0) - else: - context.expect_msg("No Ed25519 private key found") - - if run_access_checks: - context = test(f"fail on inaccessible {keyfile}") - keyfile_path = context.node.sub(keyfile) - os.chmod(keyfile_path, 0) - context.expect_msg("Error reading", code=0 if run_legacy_checks else 1) - - if run_permission_checks: - context = test(f"warn about unsafe permissions on {keyfile}") - keyfile_path = context.node.sub(keyfile) - os.chmod(keyfile_path, 0o666) - context.expect_msg("unsafe file permissions", code=0) - - if run_legacy_checks: - context = test(f"pass on missing {keyfile} when the other key is present") - keyfile_path = context.node.sub(keyfile) - os.remove(keyfile_path) - context.node.cmd("fsck") - - -def test_ec_public_key_file_var(context: Context, *paths: str) -> None: - """Test EC public keys in config *paths.""" - ec_pubkey = os.path.realpath(context.node.sub("ec_pubkey")) - - ec_key = "" - for line in read_lines(context.host): - if line.startswith("Ed25519PublicKey"): - _, _, ec_key = line.split() - break - assert ec_key - - pem = f""" ------BEGIN ED25519 PUBLIC KEY----- -{ec_key} ------END ED25519 PUBLIC KEY----- -""" - write_text(ec_pubkey, pem) - - replace_line(context.host, "Ed25519PublicKey") - - config = context.node.sub(*paths) - append_line(config, f"Ed25519PublicKeyFile = {ec_pubkey}") - - context.expect_msg("No (usable) public Ed25519", code=0, present=False) - - -############################################################################### -# Common tests -############################################################################### - -ctx = test("pass freshly created configuration") -ctx.node.cmd("fsck") - -ctx = test("fail on missing tinc.conf") -os.remove(ctx.conf) -ctx.expect_msg("No tinc configuration found") - -for suffix in "up", "down": - ctx = test(f"unknown -{suffix} script warning") - fake_path = ctx.node.sub(f"fake-{suffix}") - write_text(fake_path, "") - ctx.expect_msg("Unknown script", code=0) - -ctx = test("fix broken Ed25519 public key with --force") -replace_line(ctx.host, "Ed25519PublicKey", "Ed25519PublicKey = foobar") -ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0) -ctx.node.cmd("fsck") - -ctx = test("fix missing Ed25519 public key with --force") -replace_line(ctx.host, "Ed25519PublicKey") -ctx.expect_msg("No (usable) public Ed25519 key", force=True, code=0) -ctx.node.cmd("fsck") - -ctx = test("fail when all private keys are missing") -os.remove(ctx.ec_priv) -if run_legacy_checks: - os.remove(ctx.rsa_priv) - ctx.expect_msg("Neither RSA or Ed25519 private") -else: - ctx.expect_msg("No Ed25519 private") - -ctx = test("warn about missing EC public key and NOT fix without --force") -replace_line(ctx.host, "Ed25519PublicKey") -ctx.expect_msg("No (usable) public Ed25519", code=0) -host = read_text(ctx.host) -check.not_in("ED25519 PUBLIC KEY", host) - -ctx = test("fix missing EC public key on --force") -replace_line(ctx.host, "Ed25519PublicKey") -ctx.expect_msg("Wrote Ed25519 public key", force=True, code=0) -host = read_text(ctx.host) -check.is_in("ED25519 PUBLIC KEY", host) - -ctx = test("warn about obsolete variables") -append_line(ctx.host, "GraphDumpFile = /dev/null") -ctx.expect_msg("obsolete variable GraphDumpFile", code=0) - -ctx = test("warn about missing values") -append_line(ctx.host, "Weight = ") -ctx.expect_msg("No value for variable `Weight") - -ctx = test("warn about duplicate variables") -append_line(ctx.host, f"Weight = 0{os.linesep}Weight = 1") -ctx.expect_msg("multiple instances of variable Weight", code=0) - -ctx = test("warn about server variables in host config") -append_line(ctx.host, "Interface = fake0") -ctx.expect_msg("server variable Interface found", code=0) - -ctx = test("warn about host variables in server config") -append_line(ctx.conf, "Port = 1337") -ctx.expect_msg("host variable Port found", code=0) - -ctx = test("warn about missing Name") -replace_line(ctx.conf, "Name =") -ctx.expect_msg("without a valid Name") - -test_private_keys("ed25519_key.priv") -test_private_key_var("Ed25519PrivateKeyFile", "ed25519_key.priv") - -ctx = test("test EC public key in tinc.conf") -test_ec_public_key_file_var(ctx, "tinc.conf") - -ctx = test("test EC public key in hosts/") -test_ec_public_key_file_var(ctx, "hosts", ctx.node.name) - -if run_access_checks: - ctx = test("fail on inaccessible tinc.conf") - os.chmod(ctx.conf, 0) - ctx.expect_msg("not running tinc as root") - - ctx = test("fail on inaccessible hosts/foo") - os.chmod(ctx.host, 0) - ctx.expect_msg("Cannot open config file") - -if run_executability_checks: - ctx = test("non-executable tinc-up MUST be fixed by tinc --force") - os.chmod(ctx.tinc_up, 0o644) - ctx.expect_msg("cannot read and execute", force=True, code=0) - assert os.access(ctx.tinc_up, os.X_OK) - - ctx = test("non-executable tinc-up MUST NOT be fixed by tinc without --force") - os.chmod(ctx.tinc_up, 0o644) - ctx.expect_msg("cannot read and execute", code=0) - assert not os.access(ctx.tinc_up, os.X_OK) - - ctx = test("non-executable foo-up MUST be fixed by tinc --force") - write_text(ctx.host_up, "") - os.chmod(ctx.host_up, 0o644) - ctx.expect_msg("cannot read and execute", force=True, code=0) - assert os.access(ctx.tinc_up, os.X_OK) - - ctx = test("non-executable bar-up MUST NOT be fixed by tinc") - path = ctx.node.sub("hosts", "bar-up") - write_text(path, "") - os.chmod(path, 0o644) - ctx.expect_msg("cannot read and execute", code=0) - assert not os.access(path, os.X_OK) - -############################################################################### -# Legacy protocol -############################################################################### -if not run_legacy_checks: - log.info("skipping legacy protocol tests") - sys.exit(0) - - -def test_rsa_public_key_file_var(context: Context, *paths: str) -> None: - """Test RSA public keys in config *paths.""" - key = extract_pem(context.host) - remove_pem(context.host) - - rsa_pub = os.path.realpath(context.node.sub("rsa_pubkey")) - write_lines(rsa_pub, key) - - config = context.node.sub(*paths) - append_line(config, f"PublicKeyFile = {rsa_pub}") - - context.expect_msg("Error reading RSA public key", code=0, present=False) - - -test_private_keys("rsa_key.priv") -test_private_key_var("PrivateKeyFile", "rsa_key.priv") - -ctx = test("test rsa public key in tinc.conf") -test_rsa_public_key_file_var(ctx, "tinc.conf") - -ctx = test("test rsa public key in hosts/") -test_rsa_public_key_file_var(ctx, "hosts", ctx.node.name) - -ctx = test("warn about missing RSA private key if public key is present") -os.remove(ctx.rsa_priv) -ctx.expect_msg("public RSA key was found but no private key", code=0) - -ctx = test("warn about missing RSA public key") -remove_pem(ctx.host) -ctx.expect_msg("No (usable) public RSA", code=0) -check.not_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host)) - -ctx = test("fix missing RSA public key on --force") -remove_pem(ctx.host) -ctx.expect_msg("Wrote RSA public key", force=True, code=0) -check.is_in("BEGIN RSA PUBLIC KEY", read_text(ctx.host)) - -ctx = test("RSA PublicKey + PrivateKey must work") -os.remove(ctx.rsa_priv) -remove_pem(ctx.host) -append_line(ctx.conf, f"PrivateKey = {RSA_D}") -append_line(ctx.host, f"PublicKey = {RSA_N}") -ctx.expect_msg("no (usable) public RSA", code=0, present=False) - -ctx = test("RSA PrivateKey without PublicKey must warn") -os.remove(ctx.rsa_priv) -remove_pem(ctx.host) -append_line(ctx.conf, f"PrivateKey = {RSA_D}") -ctx.expect_msg("PrivateKey used but no PublicKey found", code=0) - -ctx = test("warn about missing EC private key if public key is present") -os.remove(ctx.ec_priv) -ctx.expect_msg("public Ed25519 key was found but no private key", code=0) - -ctx = test("fix broken RSA public key with --force") -host_lines = read_lines(ctx.host) -del host_lines[1] -write_lines(ctx.host, host_lines) -ctx.expect_msg("old key(s) found and disabled", force=True, code=0) -ctx.node.cmd("fsck") - -ctx = test("fix missing RSA public key with --force") -remove_pem(ctx.host) -ctx.expect_msg("No (usable) public RSA key found", force=True, code=0) -ctx.node.cmd("fsck") - -if run_permission_checks: - ctx = test("warn about unsafe permissions on tinc.conf with PrivateKey") - os.remove(ctx.rsa_priv) - append_line(ctx.conf, f"PrivateKey = {RSA_D}") - append_line(ctx.host, f"PublicKey = {RSA_N}") - os.chmod(ctx.conf, 0o666) - ctx.expect_msg("unsafe file permissions", code=0) diff --git a/test/integration/meson.build b/test/integration/meson.build index cf09cac5..4d2bf99b 100644 --- a/test/integration/meson.build +++ b/test/integration/meson.build @@ -1,6 +1,7 @@ tests = [ 'basic.py', - 'command_fsck.py', + 'cmd_fsck.py', + 'cmd_sign_verify.py', 'commandline.py', 'executables.py', 'import_export.py', diff --git a/test/integration/testlib/proc.py b/test/integration/testlib/proc.py index a0e6c458..d5cbee48 100755 --- a/test/integration/testlib/proc.py +++ b/test/integration/testlib/proc.py @@ -214,12 +214,12 @@ class Tinc: return self._port def cmd( - self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[str] = None + self, *args: str, code: T.Optional[int] = 0, stdin: T.Optional[T.AnyStr] = None ) -> T.Tuple[str, str]: """Run command through tinc, writes `stdin` to it (if the argument is not None), check its return code (if the argument is not None), and return (stdout, stderr). """ - proc = self.tinc(*args) + proc = self.tinc(*args, binary=isinstance(stdin, bytes)) log.debug('tinc %s: PID %d, in "%s", want code %s', self, proc.pid, stdin, code) out, err = proc.communicate(stdin, timeout=60) @@ -232,7 +232,7 @@ class Tinc: return out if out else "", err if err else "" - def tinc(self, *args: str) -> subp.Popen: + def tinc(self, *args: str, binary=False) -> subp.Popen: """Start tinc with the specified arguments.""" args = tuple(filter(bool, args)) cmd = [path.TINC_PATH, *self._common_args, *args] @@ -244,7 +244,7 @@ class Tinc: stdin=subp.PIPE, stdout=subp.PIPE, stderr=subp.PIPE, - encoding="utf-8", + encoding=None if binary else "utf-8", ) self._procs.append(proc) return proc