mirror of
https://github.com/mtrojnar/osslsigncode.git
synced 2025-04-04 17:00:11 -05:00
Tests: new HTTP server and client
This commit is contained in:
parent
41d98c3917
commit
eec5a0755d
50
tests/client_http.py
Normal file
50
tests/client_http.py
Normal file
@ -0,0 +1,50 @@
|
||||
"""Implementation of a HTTP client"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import http.client
|
||||
|
||||
RESULT_PATH = os.getcwd()
|
||||
LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/")
|
||||
PORT_LOG = os.path.join(LOGS_PATH, "./port.log")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Creating a POST Request"""
|
||||
ret = 0
|
||||
try:
|
||||
with open(PORT_LOG, 'r') as file:
|
||||
port = file.readline()
|
||||
conn = http.client.HTTPConnection('localhost', port)
|
||||
conn.request('POST', '/kill_server')
|
||||
response = conn.getresponse()
|
||||
print("HTTP status code:", response.getcode(), end=', ')
|
||||
try:
|
||||
text = response.read()
|
||||
print(text.decode("UTF-8"), end='', flush=True)
|
||||
except OSError as err:
|
||||
print(f"Warning: {err}")
|
||||
conn.close()
|
||||
except OSError as err:
|
||||
print(f"OSError: {err}")
|
||||
ret = err.errno
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP client error: {err}")
|
||||
ret = err
|
||||
finally:
|
||||
sys.exit(ret)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
"""
|
||||
Local Variables:
|
||||
c-basic-offset: 4
|
||||
tab-width: 4
|
||||
indent-tabs-mode: nil
|
||||
End:
|
||||
vim: set ts=4 expandtab:
|
||||
"""
|
147
tests/server_http.py
Normal file
147
tests/server_http.py
Normal file
@ -0,0 +1,147 @@
|
||||
"""Implementation of a HTTP server"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from urllib.parse import urlparse
|
||||
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
|
||||
|
||||
RESULT_PATH = os.getcwd()
|
||||
FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/")
|
||||
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
|
||||
CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/")
|
||||
LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/")
|
||||
REQUEST = os.path.join(FILES_PATH, "./jreq.tsq")
|
||||
RESPONS = os.path.join(FILES_PATH, "./jresp.tsr")
|
||||
CACRL = os.path.join(CERTS_PATH, "./CACertCRL.der")
|
||||
TSACRL = os.path.join(CERTS_PATH, "./TSACertCRL.der")
|
||||
OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf")
|
||||
PORT_LOG = os.path.join(LOGS_PATH, "./port.log")
|
||||
|
||||
|
||||
OPENSSL_TS = ["openssl", "ts",
|
||||
"-reply", "-config", OPENSSL_CONF,
|
||||
"-passin", "pass:passme",
|
||||
"-queryfile", REQUEST,
|
||||
"-out", RESPONS]
|
||||
|
||||
|
||||
class RequestHandler(SimpleHTTPRequestHandler):
|
||||
"""Handle the HTTP POST request that arrive at the server"""
|
||||
|
||||
def __init__(self, request, client_address, server):
|
||||
# Save the server handle
|
||||
self.server = server
|
||||
SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
|
||||
|
||||
def do_GET(self): # pylint: disable=invalid-name
|
||||
""""Serves the GET request type"""
|
||||
try:
|
||||
url = urlparse(self.path)
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "application/crl")
|
||||
self.end_headers()
|
||||
# Read the file and send the contents
|
||||
if url.path == "/intermediateCA":
|
||||
with open(CACRL, 'rb') as file:
|
||||
resp_data = file.read()
|
||||
if url.path == "/TSACA":
|
||||
with open(TSACRL, 'rb') as file:
|
||||
resp_data = file.read()
|
||||
self.wfile.write(resp_data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP GET request error: {err}")
|
||||
|
||||
def do_POST(self): # pylint: disable=invalid-name
|
||||
""""Serves the POST request type"""
|
||||
try:
|
||||
url = urlparse(self.path)
|
||||
self.send_response(200)
|
||||
if url.path == "/kill_server":
|
||||
self.log_message(f"Deleting file: {PORT_LOG}")
|
||||
os.remove(f"{PORT_LOG}")
|
||||
self.send_header('Content-type', 'text/plain')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes('Shutting down HTTP server', 'utf-8'))
|
||||
self.server.shutdown()
|
||||
else:
|
||||
content_length = int(self.headers['Content-Length'])
|
||||
post_data = self.rfile.read(content_length)
|
||||
with open(REQUEST, mode="wb") as file:
|
||||
file.write(post_data)
|
||||
openssl = subprocess.run(OPENSSL_TS,
|
||||
check=True, universal_newlines=True)
|
||||
openssl.check_returncode()
|
||||
self.send_header("Content-type", "application/timestamp-reply")
|
||||
self.end_headers()
|
||||
resp_data = None
|
||||
with open(RESPONS, mode="rb") as file:
|
||||
resp_data = file.read()
|
||||
self.wfile.write(resp_data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP POST request error: {err}")
|
||||
|
||||
|
||||
class HttpServerThread():
|
||||
"""TSA server thread handler"""
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
def __init__(self):
|
||||
self.server = None
|
||||
self.server_thread = None
|
||||
|
||||
def start_server(self) -> (int):
|
||||
"""Starting HTTP server on localhost and a random available port for binding"""
|
||||
self.server = ThreadingHTTPServer(('localhost', 0), RequestHandler)
|
||||
self.server_thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.server_thread.start()
|
||||
hostname, port = self.server.server_address[:2]
|
||||
print(f"HTTP server started, URL http://{hostname}:{port}")
|
||||
return port
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Start HTTP server"""
|
||||
ret = 0
|
||||
try:
|
||||
server = HttpServerThread()
|
||||
port = server.start_server()
|
||||
with open(PORT_LOG, mode="w") as file:
|
||||
file.write("{}".format(port))
|
||||
except OSError as err:
|
||||
print(f"OSError: {err}")
|
||||
ret = err.errno
|
||||
finally:
|
||||
sys.exit(ret)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
fpid = os.fork()
|
||||
if fpid > 0:
|
||||
sys.exit(0)
|
||||
except OSError as ferr:
|
||||
print(f"Fork #1 failed: {ferr.errno} {ferr.strerror}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
fpid = os.fork()
|
||||
if fpid > 0:
|
||||
sys.exit(0)
|
||||
except OSError as ferr:
|
||||
print(f"Fork #2 failed: {ferr.errno} {ferr.strerror}")
|
||||
sys.exit(1)
|
||||
|
||||
# Start the daemon main loop
|
||||
main()
|
||||
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
"""Local Variables:
|
||||
c-basic-offset: 4
|
||||
tab-width: 4
|
||||
indent-tabs-mode: nil
|
||||
End:
|
||||
vim: set ts=4 expandtab:
|
||||
"""
|
133
tests/server_http.pyw
Normal file
133
tests/server_http.pyw
Normal file
@ -0,0 +1,133 @@
|
||||
"""Windows: Implementation of a HTTP server"""
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from urllib.parse import urlparse
|
||||
from http.server import SimpleHTTPRequestHandler, ThreadingHTTPServer
|
||||
|
||||
RESULT_PATH = os.getcwd()
|
||||
FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/")
|
||||
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
|
||||
CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/")
|
||||
LOGS_PATH = os.path.join(RESULT_PATH, "./Testing/logs/")
|
||||
REQUEST = os.path.join(FILES_PATH, "./jreq.tsq")
|
||||
RESPONS = os.path.join(FILES_PATH, "./jresp.tsr")
|
||||
CACRL = os.path.join(CERTS_PATH, "./CACertCRL.der")
|
||||
TSACRL = os.path.join(CERTS_PATH, "./TSACertCRL.der")
|
||||
OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf")
|
||||
SERVER_LOG = os.path.join(LOGS_PATH, "./server.log")
|
||||
PORT_LOG = os.path.join(LOGS_PATH, "./port.log")
|
||||
|
||||
|
||||
OPENSSL_TS = ["openssl", "ts",
|
||||
"-reply", "-config", OPENSSL_CONF,
|
||||
"-passin", "pass:passme",
|
||||
"-queryfile", REQUEST,
|
||||
"-out", RESPONS]
|
||||
|
||||
|
||||
class RequestHandler(SimpleHTTPRequestHandler):
|
||||
"""Handle the HTTP POST request that arrive at the server"""
|
||||
|
||||
def __init__(self, request, client_address, server):
|
||||
# Save the server handle
|
||||
self.server = server
|
||||
SimpleHTTPRequestHandler.__init__(self, request, client_address, server)
|
||||
|
||||
def do_GET(self): # pylint: disable=invalid-name
|
||||
""""Serves the GET request type"""
|
||||
try:
|
||||
url = urlparse(self.path)
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "application/crl")
|
||||
self.end_headers()
|
||||
# Read the file and send the contents
|
||||
if url.path == "/intermediateCA":
|
||||
with open(CACRL, 'rb') as file:
|
||||
resp_data = file.read()
|
||||
if url.path == "/TSACA":
|
||||
with open(TSACRL, 'rb') as file:
|
||||
resp_data = file.read()
|
||||
self.wfile.write(resp_data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP GET request error: {err}")
|
||||
|
||||
def do_POST(self): # pylint: disable=invalid-name
|
||||
""""Serves the POST request type"""
|
||||
try:
|
||||
url = urlparse(self.path)
|
||||
self.send_response(200)
|
||||
if url.path == "/kill_server":
|
||||
self.log_message(f"Deleting file: {PORT_LOG}")
|
||||
os.remove(f"{PORT_LOG}")
|
||||
self.send_header('Content-type', 'text/plain')
|
||||
self.end_headers()
|
||||
self.wfile.write(bytes('Shutting down HTTP server', 'utf-8'))
|
||||
self.server.shutdown()
|
||||
else:
|
||||
content_length = int(self.headers['Content-Length'])
|
||||
post_data = self.rfile.read(content_length)
|
||||
with open(REQUEST, mode="wb") as file:
|
||||
file.write(post_data)
|
||||
openssl = subprocess.run(OPENSSL_TS,
|
||||
check=True, universal_newlines=True)
|
||||
openssl.check_returncode()
|
||||
self.send_header("Content-type", "application/timestamp-reply")
|
||||
self.end_headers()
|
||||
resp_data = None
|
||||
with open(RESPONS, mode="rb") as file:
|
||||
resp_data = file.read()
|
||||
self.wfile.write(resp_data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP POST request error: {err}")
|
||||
|
||||
|
||||
class HttpServerThread():
|
||||
"""TSA server thread handler"""
|
||||
# pylint: disable=too-few-public-methods
|
||||
|
||||
def __init__(self):
|
||||
self.server = None
|
||||
self.server_thread = None
|
||||
|
||||
def start_server(self) -> (int):
|
||||
"""Starting HTTP server on localhost and a random available port for binding"""
|
||||
self.server = ThreadingHTTPServer(('localhost', 19254), RequestHandler)
|
||||
self.server_thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.server_thread.start()
|
||||
hostname, port = self.server.server_address[:2]
|
||||
print(f"HTTP server started, URL http://{hostname}:{port}")
|
||||
return port
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Start HTTP server"""
|
||||
ret = 0
|
||||
try:
|
||||
sys.stdout = open(SERVER_LOG, "w")
|
||||
sys.stderr = open(SERVER_LOG, "a")
|
||||
server = HttpServerThread()
|
||||
port = server.start_server()
|
||||
with open(PORT_LOG, mode="w") as file:
|
||||
file.write("{}".format(port))
|
||||
except OSError as err:
|
||||
print(f"OSError: {err}")
|
||||
ret = err.errno
|
||||
finally:
|
||||
sys.exit(ret)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
# pylint: disable=pointless-string-statement
|
||||
"""Local Variables:
|
||||
c-basic-offset: 4
|
||||
tab-width: 4
|
||||
indent-tabs-mode: nil
|
||||
End:
|
||||
vim: set ts=4 expandtab:
|
||||
"""
|
@ -1,152 +0,0 @@
|
||||
"""Implementation of a Time Stamping Authority HTTP server"""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
|
||||
RESULT_PATH = os.getcwd()
|
||||
FILES_PATH = os.path.join(RESULT_PATH, "./Testing/files/")
|
||||
CERTS_PATH = os.path.join(RESULT_PATH, "./Testing/certs/")
|
||||
CONF_PATH = os.path.join(RESULT_PATH, "./Testing/conf/")
|
||||
DEFAULT_IN = os.path.join(FILES_PATH, "./unsigned.exe")
|
||||
DEFAULT_OUT = os.path.join(FILES_PATH, "./ts.exe")
|
||||
DEFAULT_CERT = os.path.join(CERTS_PATH, "./cert.pem")
|
||||
DEFAULT_KEY = os.path.join(CERTS_PATH, "./key.pem")
|
||||
DEFAULT_CROSSCERT = os.path.join(CERTS_PATH, "./crosscert.pem")
|
||||
OPENSSL_CONF = os.path.join(CONF_PATH, "./openssl_tsa.cnf")
|
||||
REQUEST = os.path.join(FILES_PATH, "./jreq.tsq")
|
||||
RESPONS = os.path.join(FILES_PATH, "./jresp.tsr")
|
||||
|
||||
if os.path.exists(os.path.join(RESULT_PATH, "./Release/")):
|
||||
OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./Release/osslsigncode")
|
||||
elif os.path.exists(os.path.join(RESULT_PATH, "./Debug/")):
|
||||
OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./Debug/osslsigncode")
|
||||
else:
|
||||
OSSLSIGNCODE_FILE = os.path.join(RESULT_PATH, "./osslsigncode")
|
||||
|
||||
DEFAULT_OPENSSL = ["openssl", "ts",
|
||||
"-reply", "-config", OPENSSL_CONF,
|
||||
"-passin", "pass:passme",
|
||||
"-queryfile", REQUEST,
|
||||
"-out", RESPONS]
|
||||
|
||||
|
||||
class RequestHandler(BaseHTTPRequestHandler):
|
||||
"""Handle the HTTP POST request that arrive at the server"""
|
||||
|
||||
def do_POST(self):
|
||||
""""Serves the POST request type"""
|
||||
try:
|
||||
content_length = int(self.headers['Content-Length'])
|
||||
post_data = self.rfile.read(content_length)
|
||||
with open(REQUEST, mode="wb") as file:
|
||||
file.write(post_data)
|
||||
openssl = subprocess.run(DEFAULT_OPENSSL,
|
||||
check=True, universal_newlines=True)
|
||||
openssl.check_returncode()
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "application/timestamp-reply")
|
||||
self.end_headers()
|
||||
resp_data = None
|
||||
with open(RESPONS, mode="rb") as file:
|
||||
resp_data = file.read()
|
||||
self.wfile.write(resp_data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"HTTP POST request error: {err}")
|
||||
|
||||
|
||||
class HttpServerThread():
|
||||
"""TSA server thread handler"""
|
||||
|
||||
def __init__(self):
|
||||
self.server = None
|
||||
self.server_thread = None
|
||||
|
||||
def start_server(self) -> (str, int):
|
||||
"""Starting TSA server on localhost and a first available port"""
|
||||
self.server = HTTPServer(("127.0.0.1", 0), RequestHandler)
|
||||
self.server_thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.server_thread.start()
|
||||
hostname, port = self.server.server_address[:2]
|
||||
print(f"Timestamp server started, URL: http://{hostname}:{port}")
|
||||
return hostname, port
|
||||
|
||||
def shut_down(self):
|
||||
"""Shutting down the server"""
|
||||
if self.server:
|
||||
self.server.shutdown()
|
||||
self.server_thread.join()
|
||||
print("Server is down")
|
||||
|
||||
|
||||
def parse_args() -> str:
|
||||
"""Parse the command-line arguments."""
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--input",
|
||||
type=pathlib.Path,
|
||||
default=DEFAULT_IN,
|
||||
help="input file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
type=pathlib.Path,
|
||||
default=DEFAULT_OUT,
|
||||
help="output file"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--certs",
|
||||
type=pathlib.Path,
|
||||
default=DEFAULT_CERT,
|
||||
help="signing certificate"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--key",
|
||||
type=pathlib.Path,
|
||||
default=DEFAULT_KEY,
|
||||
help="private key"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--crosscert",
|
||||
type=pathlib.Path,
|
||||
default=DEFAULT_CROSSCERT,
|
||||
help="additional certificates"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
program = [OSSLSIGNCODE_FILE, "sign", "-in", args.input, "-out", args.output,
|
||||
"-certs", args.certs, "-key", args.key,
|
||||
"-addUnauthenticatedBlob", "-add-msi-dse", "-comm", "-ph", "-jp", "low",
|
||||
"-h", "sha384", "-time", "1556668800", "-i", "https://www.osslsigncode.com/",
|
||||
"-n", "osslsigncode", "-ac", args.crosscert, "-ts"]
|
||||
return program
|
||||
|
||||
def main() -> None:
|
||||
"""Main program"""
|
||||
ret = 0
|
||||
program = parse_args()
|
||||
server = HttpServerThread()
|
||||
hostname, port = server.start_server()
|
||||
program.append(f"{hostname}:{port}")
|
||||
try:
|
||||
osslsigncode = subprocess.run(program, check=True, universal_newlines=True)
|
||||
osslsigncode.check_returncode()
|
||||
except subprocess.CalledProcessError as err:
|
||||
ret = err.returncode
|
||||
except OSError as err:
|
||||
print(f"OSError: {err}")
|
||||
ret = err.errno
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
print(f"osslsigncode error: {err}")
|
||||
ret = 1
|
||||
finally:
|
||||
server.shut_down()
|
||||
sys.exit(ret)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
x
Reference in New Issue
Block a user