mirror of
https://git.tartarus.org/simon/putty.git
synced 2025-01-25 01:02:24 +00:00
47ca2e98a5
When testcrypt.h lists a function argument as 'opt_val_foo', it means that the argument is optional in the sense that the C function can take a null pointer in place of a valid foo, and so the Python wrapper module should accept None in the corresponding argument slot from the client code and translate it into the special string "NULL" in the wire protocol. This works fine at argument translation time, but the code that reads testcrypt.h wasn't looking at it, so if you said 'opt_val_foo_suffix' in place of 'opt_val_foo' (indicating that that argument is optional _and_ the C function expects it in a translated form), then the initial pass over testcrypt.h wouldn't strip the _suffix, and would set up data structures with mismatched type names.
231 lines
8.5 KiB
Python
231 lines
8.5 KiB
Python
import sys
|
|
import os
|
|
import numbers
|
|
import subprocess
|
|
import re
|
|
from binascii import hexlify
|
|
|
|
# Expect to be run from the 'test' subdirectory, one level down from
|
|
# the main source
|
|
putty_srcdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
def unicode_to_bytes(arg):
|
|
# Slightly fiddly way to do this which should work in Python 2 and 3
|
|
if isinstance(arg, type(u'a')) and not isinstance(arg, type(b'a')):
|
|
arg = arg.encode("UTF-8")
|
|
return arg
|
|
|
|
# Another pair of P2/P3 compatibility shims, to give a stream of
|
|
# integers corresponding to the byte values in a bytes object, and to
|
|
# take an integer and return a bytes object containing a byte with
|
|
# that value.
|
|
if b'A'[0] != b'A':
|
|
def bytevals(arg):
|
|
return arg # in P3 this is a no-op
|
|
def byte2str(arg):
|
|
return bytes([arg])
|
|
else:
|
|
def bytevals(arg):
|
|
return map(ord, arg) # in P2 you have to use ord()
|
|
def byte2str(arg):
|
|
return chr(arg)
|
|
|
|
class ChildProcess(object):
|
|
def __init__(self):
|
|
self.sp = None
|
|
self.debug = None
|
|
|
|
dbg = os.environ.get("PUTTY_TESTCRYPT_DEBUG")
|
|
if dbg is not None:
|
|
if dbg == "stderr":
|
|
self.debug = sys.stderr
|
|
else:
|
|
sys.stderr.write("Unknown value '{}' for PUTTY_TESTCRYPT_DEBUG"
|
|
" (try 'stderr'\n")
|
|
def start(self):
|
|
assert self.sp is None
|
|
override_command = os.environ.get("PUTTY_TESTCRYPT")
|
|
if override_command is None:
|
|
cmd = [os.path.join(putty_srcdir, "testcrypt")]
|
|
shell = False
|
|
else:
|
|
cmd = override_command
|
|
shell = True
|
|
self.sp = subprocess.Popen(
|
|
cmd, shell=shell, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
def write_line(self, line):
|
|
if self.debug is not None:
|
|
self.debug.write("send: {}\n".format(line))
|
|
self.sp.stdin.write(line + b"\n")
|
|
self.sp.stdin.flush()
|
|
def read_line(self):
|
|
line = self.sp.stdout.readline().rstrip(b"\r\n")
|
|
if self.debug is not None:
|
|
self.debug.write("recv: {}\n".format(line))
|
|
return line
|
|
def funcall(self, cmd, args):
|
|
if self.sp is None:
|
|
self.start()
|
|
self.write_line(unicode_to_bytes(cmd) + b" " + b" ".join(
|
|
unicode_to_bytes(arg) for arg in args))
|
|
argcount = int(self.read_line())
|
|
return [self.read_line() for arg in range(argcount)]
|
|
def check_return_status(self):
|
|
assert self.sp is not None
|
|
self.sp.stdin.close()
|
|
status = self.sp.wait()
|
|
if status != 0:
|
|
raise Exception("testcrypt returned exit status {}".format(status))
|
|
|
|
childprocess = ChildProcess()
|
|
|
|
class Value(object):
|
|
def __init__(self, typename, ident):
|
|
self.typename = typename
|
|
self.ident = ident
|
|
def consumed(self):
|
|
self.ident = None
|
|
def __repr__(self):
|
|
return "Value({!r}, {!r})".format(self.typename, self.ident)
|
|
def __del__(self):
|
|
if self.ident is not None:
|
|
childprocess.funcall("free", [self.ident])
|
|
def __long__(self):
|
|
if self.typename != "val_mpint":
|
|
raise TypeError("testcrypt values of types other than mpint"
|
|
" cannot be converted to integer")
|
|
hexval = childprocess.funcall("mp_dump", [self.ident])[0]
|
|
return 0 if len(hexval) == 0 else int(hexval, 16)
|
|
def __int__(self):
|
|
return int(self.__long__())
|
|
|
|
def make_argword(arg, argtype, fnname, argindex, to_preserve):
|
|
typename, consumed = argtype
|
|
if typename.startswith("opt_"):
|
|
if arg is None:
|
|
return "NULL"
|
|
typename = typename[4:]
|
|
if typename == "val_string":
|
|
arg = unicode_to_bytes(arg)
|
|
if isinstance(arg, bytes):
|
|
retwords = childprocess.funcall(
|
|
"newstring", ["".join("%{:02x}".format(b)
|
|
for b in bytevals(arg))])
|
|
arg = make_retvals([typename], retwords, unpack_strings=False)[0]
|
|
to_preserve.append(arg)
|
|
if typename == "val_mpint" and isinstance(arg, numbers.Integral):
|
|
retwords = childprocess.funcall("mp_literal", ["0x{:x}".format(arg)])
|
|
arg = make_retvals([typename], retwords)[0]
|
|
to_preserve.append(arg)
|
|
if isinstance(arg, Value):
|
|
if arg.typename != typename:
|
|
raise TypeError(
|
|
"{}() argument {:d} should be {} ({} given)".format(
|
|
fnname, argindex, typename, arg.typename))
|
|
ident = arg.ident
|
|
if consumed:
|
|
arg.consumed()
|
|
return ident
|
|
if typename == "uint" and isinstance(arg, numbers.Integral):
|
|
return "0x{:x}".format(arg)
|
|
if typename in {
|
|
"hashalg", "macalg", "keyalg", "ssh1_cipheralg", "ssh2_cipheralg",
|
|
"dh_group", "ecdh_alg", "rsaorder"}:
|
|
arg = unicode_to_bytes(arg)
|
|
if isinstance(arg, bytes) and b" " not in arg:
|
|
return arg
|
|
raise TypeError(
|
|
"Can't convert {}() argument {:d} to {} (value was {!r})".format(
|
|
fnname, argindex, typename, arg))
|
|
|
|
def make_retval(rettype, word, unpack_strings):
|
|
if rettype == "val_string" and unpack_strings:
|
|
retwords = childprocess.funcall("getstring", [word])
|
|
childprocess.funcall("free", [word])
|
|
return re.sub(b"%[0-9A-F][0-9A-F]",
|
|
lambda m: byte2str(int(m.group(0)[1:], 16)),
|
|
retwords[0])
|
|
if rettype.startswith("val_"):
|
|
return Value(rettype, word)
|
|
elif rettype == "uint":
|
|
return int(word, 0)
|
|
elif rettype == "boolean":
|
|
assert word == b"true" or word == b"false"
|
|
return word == b"true"
|
|
raise TypeError("Can't deal with return value {!r} of type {!r}"
|
|
.format(rettype, word))
|
|
|
|
def make_retvals(rettypes, retwords, unpack_strings=True):
|
|
assert len(rettypes) == len(retwords) # FIXME: better exception
|
|
return [make_retval(rettype, word, unpack_strings)
|
|
for rettype, word in zip(rettypes, retwords)]
|
|
|
|
class Function(object):
|
|
def __init__(self, fnname, rettypes, argtypes):
|
|
self.fnname = fnname
|
|
self.rettypes = rettypes
|
|
self.argtypes = argtypes
|
|
def __repr__(self):
|
|
return "<Function {}>".format(self.fnname)
|
|
def __call__(self, *args):
|
|
if len(args) != len(self.argtypes):
|
|
raise TypeError(
|
|
"{}() takes exactly {} arguments ({} given)".format(
|
|
self.fnname, len(self.argtypes), len(args)))
|
|
to_preserve = []
|
|
retwords = childprocess.funcall(
|
|
self.fnname, [make_argword(args[i], self.argtypes[i],
|
|
self.fnname, i, to_preserve)
|
|
for i in range(len(args))])
|
|
retvals = make_retvals(self.rettypes, retwords)
|
|
if len(retvals) == 0:
|
|
return None
|
|
if len(retvals) == 1:
|
|
return retvals[0]
|
|
return tuple(retvals)
|
|
|
|
def _setup(scope):
|
|
header_file = os.path.join(putty_srcdir, "testcrypt.h")
|
|
|
|
prefix, suffix = "FUNC(", ")"
|
|
valprefix = "val_"
|
|
outprefix = "out_"
|
|
optprefix = "opt_"
|
|
consprefix = "consumed_"
|
|
|
|
def trim_argtype(arg):
|
|
if arg.startswith(optprefix):
|
|
return optprefix + trim_argtype(arg[len(optprefix):])
|
|
|
|
if (arg.startswith(valprefix) and
|
|
"_" in arg[len(valprefix):]):
|
|
# Strip suffixes like val_string_asciz
|
|
arg = arg[:arg.index("_", len(valprefix))]
|
|
return arg
|
|
|
|
with open(header_file) as f:
|
|
for line in iter(f.readline, ""):
|
|
line = line.rstrip("\r\n").replace(" ", "")
|
|
if line.startswith(prefix) and line.endswith(suffix):
|
|
words = line[len(prefix):-len(suffix)].split(",")
|
|
function = words[1]
|
|
rettypes = []
|
|
argtypes = []
|
|
argsconsumed = []
|
|
if words[0] != "void":
|
|
rettypes.append(trim_argtype(words[0]))
|
|
for arg in words[2:]:
|
|
if arg.startswith(outprefix):
|
|
rettypes.append(trim_argtype(arg[len(outprefix):]))
|
|
else:
|
|
consumed = False
|
|
if arg.startswith(consprefix):
|
|
arg = arg[len(consprefix):]
|
|
consumed = True
|
|
arg = trim_argtype(arg)
|
|
argtypes.append((arg, consumed))
|
|
scope[function] = Function(function, rettypes, argtypes)
|
|
|
|
_setup(globals())
|
|
del _setup
|