diff --git a/test/ca.py b/test/ca.py new file mode 100755 index 00000000..2ec24a31 --- /dev/null +++ b/test/ca.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +# +# Implementation of OpenSSH certificate creation. Used in +# cryptsuite.py to construct certificates for test purposes. +# +# Can also be run standalone to function as an actual CA, though I +# don't currently know of any reason you'd want to use it in place of +# ssh-keygen. In that mode, it depends on having an SSH agent +# available to do the signing. + +import argparse +import base64 +import enum +import hashlib +import io +import os + +import ssh + +class Container: + pass + +class CertType(enum.Enum): + user = 1 + host = 2 + +def maybe_encode(s): + if isinstance(s, bytes): + return s + return s.encode('UTF-8') + +def make_signature_preimage( + key_to_certify, ca_key, certtype, keyid, serial, principals, + valid_after=0, valid_before=0xFFFFFFFFFFFFFFFF, + critical_options={}, extensions={}, + reserved=b'', nonce=None): + + alg, pubkeydata = ssh.ssh_decode_string(key_to_certify, True) + + if nonce is None: + nonce = os.urandom(32) + + buf = io.BytesIO() + buf.write(ssh.ssh_string(alg + b"-cert-v01@openssh.com")) + buf.write(ssh.ssh_string(nonce)) + buf.write(pubkeydata) + buf.write(ssh.ssh_uint64(serial)) + buf.write(ssh.ssh_uint32(certtype.value if isinstance(certtype, CertType) + else certtype)) + buf.write(ssh.ssh_string(maybe_encode(keyid))) + buf.write(ssh.ssh_string(b''.join( + ssh.ssh_string(maybe_encode(principal)) + for principal in principals))) + buf.write(ssh.ssh_uint64(valid_after)) + buf.write(ssh.ssh_uint64(valid_before)) + buf.write(ssh.ssh_string(b''.join( + ssh.ssh_string(opt) + ssh.ssh_string(val) + for opt, val in sorted([(maybe_encode(opt), maybe_encode(val)) + for opt, val in critical_options.items()])))) + buf.write(ssh.ssh_string(b''.join( + ssh.ssh_string(opt) + ssh.ssh_string(val) + for opt, val in sorted([(maybe_encode(opt), maybe_encode(val)) + for opt, val in extensions.items()])))) + buf.write(ssh.ssh_string(reserved)) + # The CA key here can be a raw 'bytes', or an ssh_key object + # exposed via testcrypt + if type(ca_key) != bytes: + ca_key = ca_key.public_blob() + buf.write(ssh.ssh_string(ca_key)) + + return buf.getvalue() + +def make_full_cert(preimage, signature): + return preimage + ssh.ssh_string(signature) + +def sign_cert_via_testcrypt(preimage, ca_key, signflags=None): + # Expects ca_key to be a testcrypt ssh_key object + signature = ca_key.sign(preimage, 0 if signflags is None else signflags) + return make_full_cert(preimage, signature) + +def sign_cert_via_agent(preimage, ca_key, signflags=None): + # Expects ca_key to be a binary public key blob, and for a + # currently running SSH agent to contain the corresponding private + # key. + import agenttest + sign_request = (ssh.ssh_byte(ssh.SSH2_AGENTC_SIGN_REQUEST) + + ssh.ssh_string(ca_key) + ssh.ssh_string(preimage)) + if signflags is not None: + sign_request += ssh.ssh_uint32(signflags) + sign_response = agenttest.agent_query(sign_request) + msgtype, sign_response = ssh.ssh_decode_byte(sign_response, True) + if msgtype == ssh.SSH2_AGENT_SIGN_RESPONSE: + signature, sign_response = ssh.ssh_decode_string(sign_response, True) + return make_full_cert(preimage, signature) + elif msgtype == ssh.SSH2_AGENT_FAILURE: + raise IOError("Agent refused to return a signature") + else: + raise IOError("Agent returned unexpecteed message type {:d}" + .format(msgtype)) + +def read_pubkey_file(fh): + b64buf = io.StringIO() + comment = None + + lines = (line.rstrip("\r\n") for line in iter(fh.readline, "")) + line = next(lines) + + if line == "---- BEGIN SSH2 PUBLIC KEY ----": + # RFC 4716 public key. Read headers like Comment: + line = next(lines) + while ":" in line: + key, val = line.split(":", 1) + if key == "Comment": + comment = val.strip("\r\n") + line = next(lines) + # Now expect lines of base64 data. + while line != "---- BEGIN SSH2 PUBLIC KEY ----": + b64buf.write(line) + line = next(lines) + + else: + # OpenSSH public key. Expect the b64buf blob to be the second word. + fields = line.split(" ", 2) + b64buf.write(fields[1]) + if len(fields) > 1: + comment = fields[2] + + return base64.b64decode(b64buf.getvalue()), comment + +def write_pubkey_file(fh, key, comment=None): + alg = ssh.ssh_decode_string(key) + fh.write(alg.decode('ASCII')) + fh.write(" " + base64.b64encode(key).decode('ASCII')) + if comment is not None: + fh.write(" " + comment) + fh.write("\n") + +def default_signflags(key): + alg = ssh.ssh_decode_string(key) + if alg == b'ssh-rsa': + return 4 # RSA-SHA-512 + +def main(): + parser = argparse.ArgumentParser( + description='Create and sign OpenSSH certificates.') + parser.add_argument("key_to_certify", help="Public key to be certified.") + parser.add_argument("--ca-key", required=True, + help="Public key of the CA. Must be present in a " + "currently accessible SSH agent.") + parser.add_argument("-o", "--output", required=True, + help="File to write output OpenSSH key to.") + parser.add_argument("--type", required=True, choices={'user', 'host'}, + help="Type of certificate to make.") + parser.add_argument("--principal", "--user", "--host", + required=True, action="append", + help="User names or host names to authorise.") + parser.add_argument("--key-id", "--keyid", required=True, + help="Human-readable key ID string for log files.") + parser.add_argument("--serial", type=int, required=True, + help="Serial number to write into certificate.") + parser.add_argument("--signflags", type=int, help="Signature flags " + "(e.g. 2 = RSA-SHA-256, 4 = RSA-SHA-512).") + args = parser.parse_args() + + with open(args.key_to_certify) as fh: + key_to_certify, comment = read_pubkey_file(fh) + with open(args.ca_key) as fh: + ca_key, _ = read_pubkey_file(fh) + + extensions = { + 'permit-X11-forwarding': '', + 'permit-agent-forwarding': '', + 'permit-port-forwarding': '', + 'permit-pty': '', + 'permit-user-rc': '', + } + + # FIXME: for a full-featured command-line CA we'd need to add + # command-line options for crit opts, extensions and validity + # period + preimage = make_signature_preimage( + key_to_certify = key_to_certify, + ca_key = ca_key, + certtype = getattr(CertType, args.type), + keyid = args.key_id, + serial = args.serial, + principals = args.principal, + extensions = extensions) + + signflags = (args.signflags if args.signflags is not None + else default_signflags(ca_key)) + cert = sign_cert_via_agent(preimage, ca_key, signflags) + + with open(args.output, "w") as fh: + write_pubkey_file(fh, cert, comment) + +if __name__ == '__main__': + main() diff --git a/test/ssh.py b/test/ssh.py index c4f2531f..53b15183 100644 --- a/test/ssh.py +++ b/test/ssh.py @@ -24,6 +24,9 @@ def ssh_byte(n): def ssh_uint32(n): return struct.pack(">L", n) +def ssh_uint64(n): + return struct.pack(">Q", n) + def ssh_string(s): return ssh_uint32(len(s)) + s @@ -53,6 +56,10 @@ def ssh_decode_byte(s): def ssh_decode_uint32(s): return struct.unpack_from(">L", s, 0)[0], 4 +@decoder +def ssh_decode_uint64(s): + return struct.unpack_from(">Q", s, 0)[0], 8 + @decoder def ssh_decode_string(s): length = ssh_decode_uint32(s)