import os
import re
-import tempfile
+import time
import typing as T
import multiprocessing.connection as mp
import logging
from threading import Thread
from socketserver import ThreadingMixIn, TCPServer, StreamRequestHandler
-from testlib import check, cmd, path
+from testlib import check, cmd, path, util
from testlib.proc import Tinc, Script
from testlib.test import Test
from testlib.util import random_string
from testlib.log import log
+from testlib.feature import HAVE_SANDBOX
USERNAME = random_string(8)
PASSWORD = random_string(8)
with mp.Client(("127.0.0.1", {port}), family="AF_INET") as client:
client.send({{ **os.environ }})
"""
-
- file = tempfile.mktemp()
- with open(file, "w", encoding="utf-8") as f:
- f.write(code)
-
- return file
+ return util.temp_file(code)
def test_proxy(ctx: Test, handler: T.Type[ProxyServer], user="", passw="") -> None:
foo, bar = init(ctx)
- bar.add_script(foo.script_up)
+ if HAVE_SANDBOX:
+ for node in foo, bar:
+ node.cmd("set", "Sandbox", "high")
+
bar.add_script(Script.TINC_UP)
bar.start()
worker.start()
foo.cmd("set", "Proxy", handler.name, f"127.0.0.1 {port} {user} {passw}")
+
+ foo.add_script(Script.TINC_UP)
foo.cmd("start")
- bar[foo.script_up].wait()
+ foo[Script.TINC_UP].wait()
+ time.sleep(1)
foo.cmd("stop")
bar.cmd("stop")
port = int(listener.address[1])
proxy = create_exec_proxy(port)
- foo.cmd("set", "Proxy", "exec", f"{path.PYTHON_PATH} {path.PYTHON_CMD} {proxy}")
+ foo.cmd("set", "Proxy", "exec", f"{path.PYTHON_INTERPRETER} {proxy}")
foo.cmd("start")
with listener.accept() as conn: