mirror of
https://git.tartarus.org/simon/putty.git
synced 2025-07-01 11:32:48 -05:00
New test system for mp_int and cryptography.
I've written a new standalone test program which incorporates all of PuTTY's crypto code, including the mp_int and low-level elliptic curve layers but also going all the way up to the implementations of the MAC, hash, cipher, public key and kex abstractions. The test program itself, 'testcrypt', speaks a simple line-oriented protocol on standard I/O in which you write the name of a function call followed by some inputs, and it gives you back a list of outputs preceded by a line telling you how many there are. Dynamically allocated objects are assigned string ids in the protocol, and there's a 'free' function that tells testcrypt when it can dispose of one. It's possible to speak that protocol by hand, but cumbersome. I've also provided a Python module that wraps it, by running testcrypt as a persistent subprocess and gatewaying all the function calls into things that look reasonably natural to call from Python. The Python module and testcrypt.c both read a carefully formatted header file testcrypt.h which contains the name and signature of every exported function, so it costs minimal effort to expose a given function through this test API. In a few cases it's necessary to write a wrapper in testcrypt.c that makes the function look more friendly, but mostly you don't even need that. (Though that is one of the motivations between a lot of API cleanups I've done recently!) I considered doing Python integration in the more obvious way, by linking parts of the PuTTY code directly into a native-code .so Python module. I decided against it because this way is more flexible: I can run the testcrypt program on its own, or compile it in a way that Python wouldn't play nicely with (I bet compiling just that .so with Leak Sanitiser wouldn't do what you wanted when Python loaded it!), or attach a debugger to it. I can even recompile testcrypt for a different CPU architecture (32- vs 64-bit, or even running it on a different machine over ssh or under emulation) and still layer the nice API on top of that via the local Python interpreter. All I need is a bidirectional data channel.
This commit is contained in:
226
test/testcrypt.py
Normal file
226
test/testcrypt.py
Normal file
@ -0,0 +1,226 @@
|
||||
import sys
|
||||
import os
|
||||
import numbers
|
||||
import subprocess
|
||||
import re
|
||||
from binascii import hexlify
|
||||
|
||||
# Expect to be run from the 'test' subdirectory, one level down from
|
||||
# the main source
|
||||
putty_srcdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
def unicode_to_bytes(arg):
|
||||
# Slightly fiddly way to do this which should work in Python 2 and 3
|
||||
if isinstance(arg, type(u'a')) and not isinstance(arg, type(b'a')):
|
||||
arg = arg.encode("UTF-8")
|
||||
return arg
|
||||
|
||||
# Another pair of P2/P3 compatibility shims, to give a stream of
|
||||
# integers corresponding to the byte values in a bytes object, and to
|
||||
# take an integer and return a bytes object containing a byte with
|
||||
# that value.
|
||||
if b'A'[0] != b'A':
|
||||
def bytevals(arg):
|
||||
return arg # in P3 this is a no-op
|
||||
def byte2str(arg):
|
||||
return bytes([arg])
|
||||
else:
|
||||
def bytevals(arg):
|
||||
return map(ord, arg) # in P2 you have to use ord()
|
||||
def byte2str(arg):
|
||||
return chr(arg)
|
||||
|
||||
class ChildProcess(object):
|
||||
def __init__(self):
|
||||
self.sp = None
|
||||
self.debug = None
|
||||
|
||||
dbg = os.environ.get("PUTTY_TESTCRYPT_DEBUG")
|
||||
if dbg is not None:
|
||||
if dbg == "stderr":
|
||||
self.debug = sys.stderr
|
||||
else:
|
||||
sys.stderr.write("Unknown value '{}' for PUTTY_TESTCRYPT_DEBUG"
|
||||
" (try 'stderr'\n")
|
||||
def start(self):
|
||||
assert self.sp is None
|
||||
override_command = os.environ.get("PUTTY_TESTCRYPT")
|
||||
if override_command is None:
|
||||
cmd = [os.path.join(putty_srcdir, "testcrypt")]
|
||||
shell = False
|
||||
else:
|
||||
cmd = override_command
|
||||
shell = True
|
||||
self.sp = subprocess.Popen(
|
||||
cmd, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
||||
def write_line(self, line):
|
||||
if self.debug is not None:
|
||||
self.debug.write("send: {}\n".format(line))
|
||||
self.sp.stdin.write(line + b"\n")
|
||||
self.sp.stdin.flush()
|
||||
def read_line(self):
|
||||
line = self.sp.stdout.readline().rstrip(b"\r\n")
|
||||
if self.debug is not None:
|
||||
self.debug.write("recv: {}\n".format(line))
|
||||
return line
|
||||
def funcall(self, cmd, args):
|
||||
if self.sp is None:
|
||||
self.start()
|
||||
self.write_line(unicode_to_bytes(cmd) + b" " + b" ".join(
|
||||
unicode_to_bytes(arg) for arg in args))
|
||||
argcount = int(self.read_line())
|
||||
return [self.read_line() for arg in range(argcount)]
|
||||
def check_return_status(self):
|
||||
assert self.sp is not None
|
||||
self.sp.stdin.close()
|
||||
status = self.sp.wait()
|
||||
if status != 0:
|
||||
raise Exception("testcrypt returned exit status {}".format(status))
|
||||
|
||||
childprocess = ChildProcess()
|
||||
|
||||
class Value(object):
|
||||
def __init__(self, typename, ident):
|
||||
self.typename = typename
|
||||
self.ident = ident
|
||||
def consumed(self):
|
||||
self.ident = None
|
||||
def __repr__(self):
|
||||
return "Value({!r}, {!r})".format(self.typename, self.ident)
|
||||
def __del__(self):
|
||||
if self.ident is not None:
|
||||
childprocess.funcall("free", [self.ident])
|
||||
def __long__(self):
|
||||
if self.typename != "val_mpint":
|
||||
raise TypeError("testcrypt values of types other than mpint"
|
||||
" cannot be converted to integer")
|
||||
hexval = childprocess.funcall("mp_dump", [self.ident])[0]
|
||||
return 0 if len(hexval) == 0 else int(hexval, 16)
|
||||
def __int__(self):
|
||||
return int(self.__long__())
|
||||
|
||||
def make_argword(arg, argtype, fnname, argindex, to_preserve):
|
||||
typename, consumed = argtype
|
||||
if typename.startswith("opt_"):
|
||||
if arg is None:
|
||||
return "NULL"
|
||||
typename = typename[4:]
|
||||
if typename == "val_string":
|
||||
arg = unicode_to_bytes(arg)
|
||||
if isinstance(arg, bytes):
|
||||
retwords = childprocess.funcall(
|
||||
"newstring", ["".join("%{:02x}".format(b)
|
||||
for b in bytevals(arg))])
|
||||
arg = make_retvals([typename], retwords, unpack_strings=False)[0]
|
||||
to_preserve.append(arg)
|
||||
if typename == "val_mpint" and isinstance(arg, numbers.Integral):
|
||||
retwords = childprocess.funcall("mp_literal", ["0x{:x}".format(arg)])
|
||||
arg = make_retvals([typename], retwords)[0]
|
||||
to_preserve.append(arg)
|
||||
if isinstance(arg, Value):
|
||||
if arg.typename != typename:
|
||||
raise TypeError(
|
||||
"{}() argument {:d} should be {} ({} given)".format(
|
||||
fnname, argindex, typename, arg.typename))
|
||||
ident = arg.ident
|
||||
if consumed:
|
||||
arg.consumed()
|
||||
return ident
|
||||
if typename == "uint" and isinstance(arg, numbers.Integral):
|
||||
return "0x{:x}".format(arg)
|
||||
if typename in {
|
||||
"hashalg", "macalg", "keyalg", "ssh1_cipheralg", "ssh2_cipheralg",
|
||||
"dh_group", "ecdh_alg", "rsaorder"}:
|
||||
arg = unicode_to_bytes(arg)
|
||||
if isinstance(arg, bytes) and b" " not in arg:
|
||||
return arg
|
||||
raise TypeError(
|
||||
"Can't convert {}() argument {:d} to {} (value was {!r})".format(
|
||||
fnname, argindex, typename, arg))
|
||||
|
||||
def make_retval(rettype, word, unpack_strings):
|
||||
if rettype == "val_string" and unpack_strings:
|
||||
retwords = childprocess.funcall("getstring", [word])
|
||||
childprocess.funcall("free", [word])
|
||||
return re.sub(b"%[0-9A-F][0-9A-F]",
|
||||
lambda m: byte2str(int(m.group(0)[1:], 16)),
|
||||
retwords[0])
|
||||
if rettype.startswith("val_"):
|
||||
return Value(rettype, word)
|
||||
elif rettype == "uint":
|
||||
return int(word, 0)
|
||||
elif rettype == "boolean":
|
||||
assert word == b"true" or word == b"false"
|
||||
return word == b"true"
|
||||
raise TypeError("Can't deal with return value {!r} of type {!r}"
|
||||
.format(rettype, word))
|
||||
|
||||
def make_retvals(rettypes, retwords, unpack_strings=True):
|
||||
assert len(rettypes) == len(retwords) # FIXME: better exception
|
||||
return [make_retval(rettype, word, unpack_strings)
|
||||
for rettype, word in zip(rettypes, retwords)]
|
||||
|
||||
class Function(object):
|
||||
def __init__(self, fnname, rettypes, argtypes):
|
||||
self.fnname = fnname
|
||||
self.rettypes = rettypes
|
||||
self.argtypes = argtypes
|
||||
def __repr__(self):
|
||||
return "<Function {}>".format(self.fnname)
|
||||
def __call__(self, *args):
|
||||
if len(args) != len(self.argtypes):
|
||||
raise TypeError(
|
||||
"{}() takes exactly {} arguments ({} given)".format(
|
||||
self.fnname, len(self.argtypes), len(args)))
|
||||
to_preserve = []
|
||||
retwords = childprocess.funcall(
|
||||
self.fnname, [make_argword(args[i], self.argtypes[i],
|
||||
self.fnname, i, to_preserve)
|
||||
for i in range(len(args))])
|
||||
retvals = make_retvals(self.rettypes, retwords)
|
||||
if len(retvals) == 0:
|
||||
return None
|
||||
if len(retvals) == 1:
|
||||
return retvals[0]
|
||||
return tuple(retvals)
|
||||
|
||||
def _setup(scope):
|
||||
header_file = os.path.join(putty_srcdir, "testcrypt.h")
|
||||
|
||||
prefix, suffix = "FUNC(", ")"
|
||||
valprefix = "val_"
|
||||
outprefix = "out_"
|
||||
consprefix = "consumed_"
|
||||
|
||||
def trim_argtype(arg):
|
||||
if (arg.startswith(valprefix) and
|
||||
"_" in arg[len(valprefix):]):
|
||||
# Strip suffixes like val_string_asciz
|
||||
arg = arg[:arg.index("_", len(valprefix))]
|
||||
return arg
|
||||
|
||||
with open(header_file) as f:
|
||||
for line in iter(f.readline, ""):
|
||||
line = line.rstrip("\r\n").replace(" ", "")
|
||||
if line.startswith(prefix) and line.endswith(suffix):
|
||||
words = line[len(prefix):-len(suffix)].split(",")
|
||||
function = words[1]
|
||||
rettypes = []
|
||||
argtypes = []
|
||||
argsconsumed = []
|
||||
if words[0] != "void":
|
||||
rettypes.append(trim_argtype(words[0]))
|
||||
for arg in words[2:]:
|
||||
if arg.startswith(outprefix):
|
||||
rettypes.append(trim_argtype(arg[len(outprefix):]))
|
||||
else:
|
||||
consumed = False
|
||||
if arg.startswith(consprefix):
|
||||
arg = arg[len(consprefix):]
|
||||
consumed = True
|
||||
arg = trim_argtype(arg)
|
||||
argtypes.append((arg, consumed))
|
||||
scope[function] = Function(function, rettypes, argtypes)
|
||||
|
||||
_setup(globals())
|
||||
del _setup
|
Reference in New Issue
Block a user