1
0
mirror of https://git.tartarus.org/simon/putty.git synced 2025-01-10 01:48:00 +00:00
putty-source/test/testcrypt.py
Simon Tatham 5b14abc30e New test system for mp_int and cryptography.
I've written a new standalone test program which incorporates all of
PuTTY's crypto code, including the mp_int and low-level elliptic curve
layers but also going all the way up to the implementations of the
MAC, hash, cipher, public key and kex abstractions.

The test program itself, 'testcrypt', speaks a simple line-oriented
protocol on standard I/O in which you write the name of a function
call followed by some inputs, and it gives you back a list of outputs
preceded by a line telling you how many there are. Dynamically
allocated objects are assigned string ids in the protocol, and there's
a 'free' function that tells testcrypt when it can dispose of one.

It's possible to speak that protocol by hand, but cumbersome. I've
also provided a Python module that wraps it, by running testcrypt as a
persistent subprocess and gatewaying all the function calls into
things that look reasonably natural to call from Python. The Python
module and testcrypt.c both read a carefully formatted header file
testcrypt.h which contains the name and signature of every exported
function, so it costs minimal effort to expose a given function
through this test API. In a few cases it's necessary to write a
wrapper in testcrypt.c that makes the function look more friendly, but
mostly you don't even need that. (Though that is one of the
motivations between a lot of API cleanups I've done recently!)

I considered doing Python integration in the more obvious way, by
linking parts of the PuTTY code directly into a native-code .so Python
module. I decided against it because this way is more flexible: I can
run the testcrypt program on its own, or compile it in a way that
Python wouldn't play nicely with (I bet compiling just that .so with
Leak Sanitiser wouldn't do what you wanted when Python loaded it!), or
attach a debugger to it. I can even recompile testcrypt for a
different CPU architecture (32- vs 64-bit, or even running it on a
different machine over ssh or under emulation) and still layer the
nice API on top of that via the local Python interpreter. All I need
is a bidirectional data channel.
2019-01-03 16:56:02 +00:00

227 lines
8.4 KiB
Python

import sys
import os
import numbers
import subprocess
import re
from binascii import hexlify
# Expect to be run from the 'test' subdirectory, one level down from
# the main source
putty_srcdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def unicode_to_bytes(arg):
# Slightly fiddly way to do this which should work in Python 2 and 3
if isinstance(arg, type(u'a')) and not isinstance(arg, type(b'a')):
arg = arg.encode("UTF-8")
return arg
# Another pair of P2/P3 compatibility shims, to give a stream of
# integers corresponding to the byte values in a bytes object, and to
# take an integer and return a bytes object containing a byte with
# that value.
if b'A'[0] != b'A':
def bytevals(arg):
return arg # in P3 this is a no-op
def byte2str(arg):
return bytes([arg])
else:
def bytevals(arg):
return map(ord, arg) # in P2 you have to use ord()
def byte2str(arg):
return chr(arg)
class ChildProcess(object):
def __init__(self):
self.sp = None
self.debug = None
dbg = os.environ.get("PUTTY_TESTCRYPT_DEBUG")
if dbg is not None:
if dbg == "stderr":
self.debug = sys.stderr
else:
sys.stderr.write("Unknown value '{}' for PUTTY_TESTCRYPT_DEBUG"
" (try 'stderr'\n")
def start(self):
assert self.sp is None
override_command = os.environ.get("PUTTY_TESTCRYPT")
if override_command is None:
cmd = [os.path.join(putty_srcdir, "testcrypt")]
shell = False
else:
cmd = override_command
shell = True
self.sp = subprocess.Popen(
cmd, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def write_line(self, line):
if self.debug is not None:
self.debug.write("send: {}\n".format(line))
self.sp.stdin.write(line + b"\n")
self.sp.stdin.flush()
def read_line(self):
line = self.sp.stdout.readline().rstrip(b"\r\n")
if self.debug is not None:
self.debug.write("recv: {}\n".format(line))
return line
def funcall(self, cmd, args):
if self.sp is None:
self.start()
self.write_line(unicode_to_bytes(cmd) + b" " + b" ".join(
unicode_to_bytes(arg) for arg in args))
argcount = int(self.read_line())
return [self.read_line() for arg in range(argcount)]
def check_return_status(self):
assert self.sp is not None
self.sp.stdin.close()
status = self.sp.wait()
if status != 0:
raise Exception("testcrypt returned exit status {}".format(status))
childprocess = ChildProcess()
class Value(object):
def __init__(self, typename, ident):
self.typename = typename
self.ident = ident
def consumed(self):
self.ident = None
def __repr__(self):
return "Value({!r}, {!r})".format(self.typename, self.ident)
def __del__(self):
if self.ident is not None:
childprocess.funcall("free", [self.ident])
def __long__(self):
if self.typename != "val_mpint":
raise TypeError("testcrypt values of types other than mpint"
" cannot be converted to integer")
hexval = childprocess.funcall("mp_dump", [self.ident])[0]
return 0 if len(hexval) == 0 else int(hexval, 16)
def __int__(self):
return int(self.__long__())
def make_argword(arg, argtype, fnname, argindex, to_preserve):
typename, consumed = argtype
if typename.startswith("opt_"):
if arg is None:
return "NULL"
typename = typename[4:]
if typename == "val_string":
arg = unicode_to_bytes(arg)
if isinstance(arg, bytes):
retwords = childprocess.funcall(
"newstring", ["".join("%{:02x}".format(b)
for b in bytevals(arg))])
arg = make_retvals([typename], retwords, unpack_strings=False)[0]
to_preserve.append(arg)
if typename == "val_mpint" and isinstance(arg, numbers.Integral):
retwords = childprocess.funcall("mp_literal", ["0x{:x}".format(arg)])
arg = make_retvals([typename], retwords)[0]
to_preserve.append(arg)
if isinstance(arg, Value):
if arg.typename != typename:
raise TypeError(
"{}() argument {:d} should be {} ({} given)".format(
fnname, argindex, typename, arg.typename))
ident = arg.ident
if consumed:
arg.consumed()
return ident
if typename == "uint" and isinstance(arg, numbers.Integral):
return "0x{:x}".format(arg)
if typename in {
"hashalg", "macalg", "keyalg", "ssh1_cipheralg", "ssh2_cipheralg",
"dh_group", "ecdh_alg", "rsaorder"}:
arg = unicode_to_bytes(arg)
if isinstance(arg, bytes) and b" " not in arg:
return arg
raise TypeError(
"Can't convert {}() argument {:d} to {} (value was {!r})".format(
fnname, argindex, typename, arg))
def make_retval(rettype, word, unpack_strings):
if rettype == "val_string" and unpack_strings:
retwords = childprocess.funcall("getstring", [word])
childprocess.funcall("free", [word])
return re.sub(b"%[0-9A-F][0-9A-F]",
lambda m: byte2str(int(m.group(0)[1:], 16)),
retwords[0])
if rettype.startswith("val_"):
return Value(rettype, word)
elif rettype == "uint":
return int(word, 0)
elif rettype == "boolean":
assert word == b"true" or word == b"false"
return word == b"true"
raise TypeError("Can't deal with return value {!r} of type {!r}"
.format(rettype, word))
def make_retvals(rettypes, retwords, unpack_strings=True):
assert len(rettypes) == len(retwords) # FIXME: better exception
return [make_retval(rettype, word, unpack_strings)
for rettype, word in zip(rettypes, retwords)]
class Function(object):
def __init__(self, fnname, rettypes, argtypes):
self.fnname = fnname
self.rettypes = rettypes
self.argtypes = argtypes
def __repr__(self):
return "<Function {}>".format(self.fnname)
def __call__(self, *args):
if len(args) != len(self.argtypes):
raise TypeError(
"{}() takes exactly {} arguments ({} given)".format(
self.fnname, len(self.argtypes), len(args)))
to_preserve = []
retwords = childprocess.funcall(
self.fnname, [make_argword(args[i], self.argtypes[i],
self.fnname, i, to_preserve)
for i in range(len(args))])
retvals = make_retvals(self.rettypes, retwords)
if len(retvals) == 0:
return None
if len(retvals) == 1:
return retvals[0]
return tuple(retvals)
def _setup(scope):
header_file = os.path.join(putty_srcdir, "testcrypt.h")
prefix, suffix = "FUNC(", ")"
valprefix = "val_"
outprefix = "out_"
consprefix = "consumed_"
def trim_argtype(arg):
if (arg.startswith(valprefix) and
"_" in arg[len(valprefix):]):
# Strip suffixes like val_string_asciz
arg = arg[:arg.index("_", len(valprefix))]
return arg
with open(header_file) as f:
for line in iter(f.readline, ""):
line = line.rstrip("\r\n").replace(" ", "")
if line.startswith(prefix) and line.endswith(suffix):
words = line[len(prefix):-len(suffix)].split(",")
function = words[1]
rettypes = []
argtypes = []
argsconsumed = []
if words[0] != "void":
rettypes.append(trim_argtype(words[0]))
for arg in words[2:]:
if arg.startswith(outprefix):
rettypes.append(trim_argtype(arg[len(outprefix):]))
else:
consumed = False
if arg.startswith(consprefix):
arg = arg[len(consprefix):]
consumed = True
arg = trim_argtype(arg)
argtypes.append((arg, consumed))
scope[function] = Function(function, rettypes, argtypes)
_setup(globals())
del _setup