diff --git a/tests/client_http.py b/tests/client_http.py new file mode 100644 index 0000000..ecf6269 --- /dev/null +++ b/tests/client_http.py @@ -0,0 +1,50 @@ +"""Implementation of a HTTP client""" + +import os +import sys +import http.client + +RESULT_PATH = os.getcwd() +LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/") +PORT_LOG = os.path.join(LOGS_PATH, "./port.log") + + +def main() -> None: + """Creating a POST Request""" + ret = 0 + try: + with open(PORT_LOG, 'r') as file: + port = file.readline() + conn = http.client.HTTPConnection('localhost', port) + conn.request('POST', '/kill_server') + response = conn.getresponse() + print("HTTP status code:", response.getcode(), end=', ') + try: + text = response.read() + print(text.decode("UTF-8"), end='', flush=True) + except OSError as err: + print(f"Warning: {err}") + conn.close() + except OSError as err: + print(f"OSError: {err}") + ret = err.errno + except Exception as err: # pylint: disable=broad-except + print(f"HTTP client error: {err}") + ret = err + finally: + sys.exit(ret) + + +if __name__ == '__main__': + main() + + +# pylint: disable=pointless-string-statement +""" +Local Variables: + c-basic-offset: 4 + tab-width: 4 + indent-tabs-mode: nil +End: + vim: set ts=4 expandtab: +""" diff --git a/tests/server_http.py b/tests/server_http.py new file mode 100644 index 0000000..d717e42 --- /dev/null +++ b/tests/server_http.py @@ -0,0 +1,147 @@ +"""Implementation of a HTTP server""" + +import os +import subprocess +import sys +import threading +from urllib.parse import urlparse +from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer + +RESULT_PATH = os.getcwd() +FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/") +CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/") +CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/") +LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/") +REQUEST = os.path.join(FILES_PATH, "./jreq.tsq") +RESPONS = os.path.join(FILES_PATH, "./jresp.tsr") +CACRL = os.path.join(CERTS_PATH, "./CACertCRL.der") +TSACRL = os.path.join(CERTS_PATH, "./TSACertCRL.der") +OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf") +PORT_LOG = os.path.join(LOGS_PATH, "./port.log") + + +OPENSSL_TS = ["openssl", "ts", + "-reply", "-config", OPENSSL_CONF, + "-passin", "pass:passme", + "-queryfile", REQUEST, + "-out", RESPONS] + + +class RequestHandler(SimpleHTTPRequestHandler): + """Handle the HTTP POST request that arrive at the server""" + + def __init__(self, request, client_address, server): + # Save the server handle + self.server = server + SimpleHTTPRequestHandler.__init__(self, request, client_address, server) + + def do_GET(self): # pylint: disable=invalid-name + """"Serves the GET request type""" + try: + url = urlparse(self.path) + self.send_response(200) + self.send_header("Content-type", "application/crl") + self.end_headers() + # Read the file and send the contents + if url.path == "/intermediateCA": + with open(CACRL, 'rb') as file: + resp_data = file.read() + if url.path == "/TSACA": + with open(TSACRL, 'rb') as file: + resp_data = file.read() + self.wfile.write(resp_data) + except Exception as err: # pylint: disable=broad-except + print(f"HTTP GET request error: {err}") + + def do_POST(self): # pylint: disable=invalid-name + """"Serves the POST request type""" + try: + url = urlparse(self.path) + self.send_response(200) + if url.path == "/kill_server": + self.log_message(f"Deleting file: {PORT_LOG}") + os.remove(f"{PORT_LOG}") + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(bytes('Shutting down HTTP server', 'utf-8')) + self.server.shutdown() + else: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + with open(REQUEST, mode="wb") as file: + file.write(post_data) + openssl = subprocess.run(OPENSSL_TS, + check=True, universal_newlines=True) + openssl.check_returncode() + self.send_header("Content-type", "application/timestamp-reply") + self.end_headers() + resp_data = None + with open(RESPONS, mode="rb") as file: + resp_data = file.read() + self.wfile.write(resp_data) + except Exception as err: # pylint: disable=broad-except + print(f"HTTP POST request error: {err}") + + +class HttpServerThread(): + """TSA server thread handler""" + # pylint: disable=too-few-public-methods + + def __init__(self): + self.server = None + self.server_thread = None + + def start_server(self) -> (int): + """Starting HTTP server on localhost and a random available port for binding""" + self.server = ThreadingHTTPServer(('localhost', 0), RequestHandler) + self.server_thread = threading.Thread(target=self.server.serve_forever) + self.server_thread.start() + hostname, port = self.server.server_address[:2] + print(f"HTTP server started, URL http://{hostname}:{port}") + return port + + +def main() -> None: + """Start HTTP server""" + ret = 0 + try: + server = HttpServerThread() + port = server.start_server() + with open(PORT_LOG, mode="w") as file: + file.write("{}".format(port)) + except OSError as err: + print(f"OSError: {err}") + ret = err.errno + finally: + sys.exit(ret) + + +if __name__ == '__main__': + try: + fpid = os.fork() + if fpid > 0: + sys.exit(0) + except OSError as ferr: + print(f"Fork #1 failed: {ferr.errno} {ferr.strerror}") + sys.exit(1) + + try: + fpid = os.fork() + if fpid > 0: + sys.exit(0) + except OSError as ferr: + print(f"Fork #2 failed: {ferr.errno} {ferr.strerror}") + sys.exit(1) + + # Start the daemon main loop + main() + + +# pylint: disable=pointless-string-statement +"""Local Variables: + c-basic-offset: 4 + tab-width: 4 + indent-tabs-mode: nil +End: + vim: set ts=4 expandtab: +""" diff --git a/tests/server_http.pyw b/tests/server_http.pyw new file mode 100644 index 0000000..a2cd86f --- /dev/null +++ b/tests/server_http.pyw @@ -0,0 +1,133 @@ +"""Windows: Implementation of a HTTP server""" + +import os +import subprocess +import sys +import threading +from urllib.parse import urlparse +from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer + +RESULT_PATH = os.getcwd() +FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/") +CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/") +CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/") +LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/") +REQUEST = os.path.join(FILES_PATH, "./jreq.tsq") +RESPONS = os.path.join(FILES_PATH, "./jresp.tsr") +CACRL = os.path.join(CERTS_PATH, "./CACertCRL.der") +TSACRL = os.path.join(CERTS_PATH, "./TSACertCRL.der") +OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf") +SERVER_LOG = os.path.join(LOGS_PATH, "./server.log") +PORT_LOG = os.path.join(LOGS_PATH, "./port.log") + + +OPENSSL_TS = ["openssl", "ts", + "-reply", "-config", OPENSSL_CONF, + "-passin", "pass:passme", + "-queryfile", REQUEST, + "-out", RESPONS] + + +class RequestHandler(SimpleHTTPRequestHandler): + """Handle the HTTP POST request that arrive at the server""" + + def __init__(self, request, client_address, server): + # Save the server handle + self.server = server + SimpleHTTPRequestHandler.__init__(self, request, client_address, server) + + def do_GET(self): # pylint: disable=invalid-name + """"Serves the GET request type""" + try: + url = urlparse(self.path) + self.send_response(200) + self.send_header("Content-type", "application/crl") + self.end_headers() + # Read the file and send the contents + if url.path == "/intermediateCA": + with open(CACRL, 'rb') as file: + resp_data = file.read() + if url.path == "/TSACA": + with open(TSACRL, 'rb') as file: + resp_data = file.read() + self.wfile.write(resp_data) + except Exception as err: # pylint: disable=broad-except + print(f"HTTP GET request error: {err}") + + def do_POST(self): # pylint: disable=invalid-name + """"Serves the POST request type""" + try: + url = urlparse(self.path) + self.send_response(200) + if url.path == "/kill_server": + self.log_message(f"Deleting file: {PORT_LOG}") + os.remove(f"{PORT_LOG}") + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(bytes('Shutting down HTTP server', 'utf-8')) + self.server.shutdown() + else: + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + with open(REQUEST, mode="wb") as file: + file.write(post_data) + openssl = subprocess.run(OPENSSL_TS, + check=True, universal_newlines=True) + openssl.check_returncode() + self.send_header("Content-type", "application/timestamp-reply") + self.end_headers() + resp_data = None + with open(RESPONS, mode="rb") as file: + resp_data = file.read() + self.wfile.write(resp_data) + except Exception as err: # pylint: disable=broad-except + print(f"HTTP POST request error: {err}") + + +class HttpServerThread(): + """TSA server thread handler""" + # pylint: disable=too-few-public-methods + + def __init__(self): + self.server = None + self.server_thread = None + + def start_server(self) -> (int): + """Starting HTTP server on localhost and a random available port for binding""" + self.server = ThreadingHTTPServer(('localhost', 19254), RequestHandler) + self.server_thread = threading.Thread(target=self.server.serve_forever) + self.server_thread.start() + hostname, port = self.server.server_address[:2] + print(f"HTTP server started, URL http://{hostname}:{port}") + return port + + +def main() -> None: + """Start HTTP server""" + ret = 0 + try: + sys.stdout = open(SERVER_LOG, "w") + sys.stderr = open(SERVER_LOG, "a") + server = HttpServerThread() + port = server.start_server() + with open(PORT_LOG, mode="w") as file: + file.write("{}".format(port)) + except OSError as err: + print(f"OSError: {err}") + ret = err.errno + finally: + sys.exit(ret) + + +if __name__ == '__main__': + main() + + +# pylint: disable=pointless-string-statement +"""Local Variables: + c-basic-offset: 4 + tab-width: 4 + indent-tabs-mode: nil +End: + vim: set ts=4 expandtab: +""" diff --git a/tests/tsa_server.py b/tests/tsa_server.py deleted file mode 100644 index b8da5aa..0000000 --- a/tests/tsa_server.py +++ /dev/null @@ -1,152 +0,0 @@ -"""Implementation of a Time Stamping Authority HTTP server""" - -import argparse -import contextlib -import os -import pathlib -import subprocess -import sys -import threading -from http.server import BaseHTTPRequestHandler, HTTPServer - -RESULT_PATH = os.getcwd() -FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/") -CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/") -CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/") -DEFAULT_IN = os.path.join(FILES_PATH, "./unsigned.exe") -DEFAULT_OUT = os.path.join(FILES_PATH, "./ts.exe") -DEFAULT_CERT = os.path.join(CERTS_PATH, "./cert.pem") -DEFAULT_KEY = os.path.join(CERTS_PATH, "./key.pem") -DEFAULT_CROSSCERT = os.path.join(CERTS_PATH, "./crosscert.pem") -OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf") -REQUEST = os.path.join(FILES_PATH, "./jreq.tsq") -RESPONS = os.path.join(FILES_PATH, "./jresp.tsr") - -if os.path.exists(os.path.join(RESULT_PATH, "./Release/")): - OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./Release/osslsigncode") -elif os.path.exists(os.path.join(RESULT_PATH, "./Debug/")): - OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./Debug/osslsigncode") -else: - OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./osslsigncode") - -DEFAULT_OPENSSL = ["openssl", "ts", - "-reply", "-config", OPENSSL_CONF, - "-passin", "pass:passme", - "-queryfile", REQUEST, - "-out", RESPONS] - - -class RequestHandler(BaseHTTPRequestHandler): - """Handle the HTTP POST request that arrive at the server""" - - def do_POST(self): - """"Serves the POST request type""" - try: - content_length = int(self.headers['Content-Length']) - post_data = self.rfile.read(content_length) - with open(REQUEST, mode="wb") as file: - file.write(post_data) - openssl = subprocess.run(DEFAULT_OPENSSL, - check=True, universal_newlines=True) - openssl.check_returncode() - self.send_response(200) - self.send_header("Content-type", "application/timestamp-reply") - self.end_headers() - resp_data = None - with open(RESPONS, mode="rb") as file: - resp_data = file.read() - self.wfile.write(resp_data) - except Exception as err: # pylint: disable=broad-except - print(f"HTTP POST request error: {err}") - - -class HttpServerThread(): - """TSA server thread handler""" - - def __init__(self): - self.server = None - self.server_thread = None - - def start_server(self) -> (str, int): - """Starting TSA server on localhost and a first available port""" - self.server = HTTPServer(("127.0.0.1", 0), RequestHandler) - self.server_thread = threading.Thread(target=self.server.serve_forever) - self.server_thread.start() - hostname, port = self.server.server_address[:2] - print(f"Timestamp server started, URL: http://{hostname}:{port}") - return hostname, port - - def shut_down(self): - """Shutting down the server""" - if self.server: - self.server.shutdown() - self.server_thread.join() - print("Server is down") - - -def parse_args() -> str: - """Parse the command-line arguments.""" - parser = argparse.ArgumentParser() - parser.add_argument( - "--input", - type=pathlib.Path, - default=DEFAULT_IN, - help="input file" - ) - parser.add_argument( - "--output", - type=pathlib.Path, - default=DEFAULT_OUT, - help="output file" - ) - parser.add_argument( - "--certs", - type=pathlib.Path, - default=DEFAULT_CERT, - help="signing certificate" - ) - parser.add_argument( - "--key", - type=pathlib.Path, - default=DEFAULT_KEY, - help="private key" - ) - parser.add_argument( - "--crosscert", - type=pathlib.Path, - default=DEFAULT_CROSSCERT, - help="additional certificates" - ) - args = parser.parse_args() - program = [OSSLSIGNCODE_FILE, "sign", "-in", args.input, "-out", args.output, - "-certs", args.certs, "-key", args.key, - "-addUnauthenticatedBlob", "-add-msi-dse", "-comm", "-ph", "-jp", "low", - "-h", "sha384", "-time", "1556668800", "-i", "https://www.osslsigncode.com/", - "-n", "osslsigncode", "-ac", args.crosscert, "-ts"] - return program - -def main() -> None: - """Main program""" - ret = 0 - program = parse_args() - server = HttpServerThread() - hostname, port = server.start_server() - program.append(f"{hostname}:{port}") - try: - osslsigncode = subprocess.run(program, check=True, universal_newlines=True) - osslsigncode.check_returncode() - except subprocess.CalledProcessError as err: - ret = err.returncode - except OSError as err: - print(f"OSError: {err}") - ret = err.errno - except Exception as err: # pylint: disable=broad-except - print(f"osslsigncode error: {err}") - ret = 1 - finally: - server.shut_down() - sys.exit(ret) - - -if __name__ == '__main__': - main()