Files
osslsigncode/tests/server_http.py
Michał Trojnara dfc3e46a77 Typos
2025-06-20 12:28:43 +02:00

176 lines
5.9 KiB
Python

#!/usr/bin/python3
"""Implementation of an HTTP server"""
import argparse
import os
import subprocess
import sys
import threading
from urllib.parse import urlparse
from http.server import SimpleHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
from make_certificates import CertificateMaker
RESULT_PATH = os.getcwd()
FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/")
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/")
LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/")
REQUEST = os.path.join(FILES_PATH, "./jreq.tsq")
RESPONSE = os.path.join(FILES_PATH, "./jresp.tsr")
OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf")
SERVER_LOG = os.path.join(LOGS_PATH, "./server.log")
URL_LOG = os.path.join(LOGS_PATH, "./url.log")
OPENSSL_TS = ["openssl", "ts",
"-reply", "-config", OPENSSL_CONF,
"-passin", "pass:passme",
"-queryfile", REQUEST,
"-out", RESPONSE]
class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
"""This variant of HTTPServer creates a new thread for every connection"""
daemon_threads = True
class RequestHandler(SimpleHTTPRequestHandler):
"""Handle the HTTP POST request that arrive at the server"""
def __init__(self, request, client_address, server):
# Save the server handle
self.server = server
SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
def do_GET(self): # pylint: disable=invalid-name
""""Serves the GET request type"""
try:
url = urlparse(self.path)
self.send_response(200)
self.send_header("Content-type", "application/pkix-crl")
self.end_headers()
resp_data = b''
# Read the file and send the contents
if url.path == "/intermediateCA":
file_path = os.path.join(CERTS_PATH, "./CACertCRL.der")
with open(file_path, 'rb') as file:
resp_data = file.read()
if url.path == "/TSACA":
file_path = os.path.join(CERTS_PATH, "./TSACertCRL.der")
with open(file_path, 'rb') as file:
resp_data = file.read()
self.wfile.write(resp_data)
except Exception as err: # pylint: disable=broad-except
print("HTTP GET request error: {}".format(err))
def do_POST(self): # pylint: disable=invalid-name
""""Serves the POST request type"""
try:
url = urlparse(self.path)
self.send_response(200)
if url.path == "/kill_server":
self.log_message(f"Deleting file: {URL_LOG}")
os.remove(f"{URL_LOG}")
self.send_header('Content-type', 'text/plain')
self.end_headers()
self.wfile.write(bytes('Shutting down HTTP server', 'utf-8'))
self.server.shutdown()
else:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
with open(REQUEST, mode="wb") as file:
file.write(post_data)
openssl = subprocess.run(OPENSSL_TS, check=True, universal_newlines=True)
openssl.check_returncode()
self.send_header("Content-type", "application/timestamp-reply")
self.end_headers()
resp_data = b''
with open(RESPONSE, mode="rb") as file:
resp_data = file.read()
self.wfile.write(resp_data)
except Exception as err: # pylint: disable=broad-except
print("HTTP POST request error: {}".format(err))
class HttpServerThread():
"""TSA server thread handler"""
# pylint: disable=too-few-public-methods
def __init__(self):
self.server = None
self.server_thread = None
def start_server(self, port) -> (int):
"""Starting HTTP server on 127.0.0.1 and a random available port for binding"""
self.server = ThreadingHTTPServer(('127.0.0.1', port), RequestHandler)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.start()
hostname, port = self.server.server_address[:2]
print("HTTP server started, URL http://{}:{}".format(hostname, port))
return port
def main() -> None:
"""Start HTTP server, make test certificates."""
ret = 0
parser = argparse.ArgumentParser()
parser.add_argument(
"--port",
type=int,
default=0,
help="port number"
)
args = parser.parse_args()
try:
server = HttpServerThread()
port = server.start_server(args.port)
with open(URL_LOG, mode="w", encoding="utf-8") as file:
file.write("127.0.0.1:{}".format(port))
tests = CertificateMaker(port, SERVER_LOG)
tests.make_certs()
except OSError as err:
print("OSError: {}".format(err))
ret = err.errno
except Exception as err: # pylint: disable=broad-except
print("Error: {}".format(err))
ret = 1
finally:
sys.exit(ret)
if __name__ == '__main__':
try:
fpid = os.fork()
if fpid > 0:
sys.exit(0)
with open(SERVER_LOG, mode='w', encoding='utf-8') as log:
os.dup2(log.fileno(), sys.stdout.fileno())
os.dup2(log.fileno(), sys.stderr.fileno())
except OSError as ferr:
print("Fork #1 failed: {} {}".format(ferr.errno, ferr.strerror))
sys.exit(1)
try:
fpid = os.fork()
if fpid > 0:
sys.exit(0)
except OSError as ferr:
print("Fork #2 failed: {} {}".format(ferr.errno, ferr.strerror))
sys.exit(1)
# Start the daemon main loop
main()
# pylint: disable=pointless-string-statement
"""Local Variables:
c-basic-offset: 4
tab-width: 4
indent-tabs-mode: nil
End:
vim: set ts=4 expandtab:
"""