diff options
author | Nguyễn Gia Phong <cnx@loang.net> | 2025-02-07 11:36:02 +0900 |
---|---|---|
committer | Nguyễn Gia Phong <cnx@loang.net> | 2025-02-07 11:47:41 +0900 |
commit | c3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5 (patch) | |
tree | 1e7379e6efe8248ef9ffd225068b195b877c3d52 /src | |
download | scadere-c3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5.tar.gz |
Draft cert checker and feed server
Diffstat (limited to 'src')
-rw-r--r-- | src/scadere/__init__.py | 2 | ||||
-rw-r--r-- | src/scadere/__main__.py | 111 | ||||
-rw-r--r-- | src/scadere/check.py | 54 | ||||
-rw-r--r-- | src/scadere/listen.py | 118 |
4 files changed, 285 insertions, 0 deletions
diff --git a/src/scadere/__init__.py b/src/scadere/__init__.py new file mode 100644 index 0000000..2a73e5d --- /dev/null +++ b/src/scadere/__init__.py @@ -0,0 +1,2 @@ +__all__ = ['__version__'] +__version__ = '0.0.1' diff --git a/src/scadere/__main__.py b/src/scadere/__main__.py new file mode 100644 index 0000000..b33d4d7 --- /dev/null +++ b/src/scadere/__main__.py @@ -0,0 +1,111 @@ +# Executable entry point +# Copyright (C) 2025 Nguyễn Gia Phong +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +from argparse import ArgumentParser, FileType, HelpFormatter +from asyncio import run +from datetime import datetime, timedelta, timezone +from pathlib import Path +from sys import stdout + +from . import __version__ +from .check import check +from .listen import listen + + +class GNUHelpFormatter(HelpFormatter): + """Help formatter for ArgumentParser following GNU Coding Standards.""" + + def add_usage(self, usage, actions, groups, prefix='Usage: '): + """Substitute 'Usage:' for 'usage:'.""" + super().add_usage(usage, actions, groups, prefix) + + def start_section(self, heading): + """Substitute 'Options:' for 'options:'.""" + super().start_section(heading.capitalize()) + + def _format_action_invocation(self, action): + """Format --long-option=argument.""" + if not action.option_strings or action.nargs is not None: + return super()._format_action_invocation(action) + arg = self._format_args(action, + self._get_default_metavar_for_optional(action)) + return ', '.join(f"{opt}{'=' if opt.startswith('--') else ' '}{arg}" + for opt in action.option_strings) + + def add_argument(self, action): + """Suppress positional arguments.""" + if action.option_strings: + super().add_argument(action) + + +class NetLoc: + def __init__(self, default_port): + self.default_port = default_port + + def __call__(self, string): + """Return hostname and port from given netloc.""" + if ':' not in string: + return string, self.default_port + hostname, port = string.rsplit(':', 1) + return hostname, int(port) # ValueError to be handled by argparse + + +def main(): + """Parse arguments and launch subprogram.""" + parser = ArgumentParser(prog='scadere', allow_abbrev=False, + formatter_class=GNUHelpFormatter) + parser.add_argument('-v', '--version', action='version', + version=f'%(prog)s {__version__}') + subparsers = parser.add_subparsers(help='subcommand help', required=True) + + check_description = ('Check TLS certificate expiration of HOST,' + ' where PORT defaults to 443.') + check_parser = subparsers.add_parser('check', + description=check_description, + formatter_class=GNUHelpFormatter) + check_parser.set_defaults(subcommand='check') + check_parser.add_argument('netloc', metavar='HOST[:PORT]', + nargs='+', type=NetLoc(443)) + check_parser.add_argument('-d', '--days', type=float, default=7, + help='days before expiration (default to 7)') + check_parser.add_argument('-o', '--output', metavar='PATH', + type=FileType('w'), default=stdout, + help='output file (default to stdout)') + + listen_description = ('Serve the TLS certificate expiration feed' + ' from INPUT file for base URL at HOST:PORT,' + ' where HOST defaults to localhost and PORT' + ' is selected randomly if not specified.') + listen_parser = subparsers.add_parser('listen', + description=listen_description, + formatter_class=GNUHelpFormatter) + listen_parser.add_argument('certs', metavar='INPUT', type=Path) + listen_parser.add_argument('base_url', metavar='URL') + listen_parser.add_argument('netloc', metavar='HOST[:PORT]', nargs='?', + type=NetLoc(None), default=('localhost', None)) + listen_parser.set_defaults(subcommand='listen') + + args = parser.parse_args() + if args.subcommand == 'check': + with args.output: + after = datetime.now(tz=timezone.utc) + timedelta(days=args.days) + check(args.netloc, after, args.output) + elif args.subcommand == 'listen': + run(listen(args.certs, args.base_url, *args.netloc)) + + +if __name__ == '__main__': + main() diff --git a/src/scadere/check.py b/src/scadere/check.py new file mode 100644 index 0000000..ee230bb --- /dev/null +++ b/src/scadere/check.py @@ -0,0 +1,54 @@ +# TLS certificate expiration checker +# Copyright (C) 2025 Nguyễn Gia Phong +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +from email.utils import parsedate_to_datetime as parsedate +from itertools import chain +from socket import AF_INET, socket +from ssl import create_default_context as tls_context +from sys import stderr + +__all__ = ['check'] + + +def check(netlocs, after, output): + """Check if each netloc's TLS certificate expires after given time. + + Print the certificate's summary to output file if that is the case. + """ + ctx = tls_context() + for hostname, port in netlocs: + netloc = f'{hostname}:{port}' + stderr.write(f'TLS certificate for {netloc} ') + try: + with ctx.wrap_socket(socket(AF_INET), + server_hostname=hostname) as conn: + conn.connect((hostname, port)) + cert = conn.getpeercert() + except Exception as e: + stderr.write(f'cannot be retrieved: {e}\n') + else: + ca = dict(chain.from_iterable(cert['issuer']))['organizationName'] + not_before = parsedate(cert['notBefore']) + not_after = parsedate(cert['notAfter']) + if after < not_after: + after_seconds = after.isoformat(timespec='seconds') + stderr.write(f'will not expire at {after_seconds}\n') + else: + stderr.write(f'will expire at {not_after.isoformat()}\n') + print(not_before.isoformat(), not_after.isoformat(), + # As unique identifier + hostname, port, cert['serialNumber'], ca, + file=output) diff --git a/src/scadere/listen.py b/src/scadere/listen.py new file mode 100644 index 0000000..5576986 --- /dev/null +++ b/src/scadere/listen.py @@ -0,0 +1,118 @@ +# HTTP server for Atom feed of TLS certificate expirations +# Copyright (C) 2025 Nguyễn Gia Phong +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +from asyncio import start_server +from datetime import datetime +from functools import partial +from urllib.parse import parse_qs, quote, urljoin, urlsplit +from xml.etree.ElementTree import (Element as xml_element, + SubElement as xml_subelement, + indent, tostring as xml_to_string) + +__all__ = ['listen'] + + +def entry(base_url, cert): + """Construct Atom entry for the given TLS certificate.""" + not_before, not_after, hostname, port, serial, issuer = cert + url = urljoin(base_url, quote(f'{hostname}/{port}/{issuer}/{serial}')) + return ('entry', + ('author', ('name', issuer)), + ('content', {'type': 'text'}, + (f'TLS certificate for {hostname}:{port}' + f' issued by {issuer} will expire at {not_after}')), + ('id', url), + ('link', {'rel': 'alternate', 'type': 'text/plain', 'href': url}), + ('title', (f'TLS certificate for {hostname}:{port}' + f' will expire at {not_after}')), + ('updated', not_before)) + + +def xml(tree, parent=None): + """Construct XML element from the given tree.""" + tag, attrs, children = ((tree[0], tree[1], tree[2:]) + if isinstance(tree[1], dict) + else (tree[0], {}, tree[1:])) + if parent is None: + elem = xml_element(tag, attrs) + else: + elem = xml_subelement(parent, tag, attrs) + for child in children: + if isinstance(child, str): + elem.text = child + else: + xml(child, elem) + if parent is None: + indent(elem) + return elem + + +async def handle(certs, base_url, reader, writer): + """Handle HTTP request.""" + summaries = tuple(cert.rstrip().split(maxsplit=5) + for cert in certs.read_text().splitlines()) + lookup = {quote(f'/{hostname}/{port}/{issuer}/{serial}'): + (not_before, not_after) + for not_before, not_after, hostname, port, serial, issuer + in summaries} + request = await reader.readuntil(b'\r\n') + url = request.removeprefix(b'GET ').rsplit(b' HTTP/', 1)[0] + url_parts = urlsplit(url.decode()) + path = url_parts.path + domains = tuple(parse_qs(url_parts.query).get('domain', [''])) + + if not request.startswith(b'GET '): + writer.write(b'HTTP/1.1 405 Method Not Allowed\r\n') + await writer.drain() + writer.close() + await writer.wait_closed() + return + elif path == '/': # Atom feed + writer.write(b'HTTP/1.1 200 OK\r\n') + writer.write(b'Content-Type: application/atom+xml\r\n') + feed = xml(('feed', {'xmlns': 'http://www.w3.org/2005/Atom'}, + ('id', base_url), + ('link', {'rel': 'self', 'href': base_url}), + ('title', certs.name), + ('updated', datetime.now().isoformat()), + *(entry(base_url, cert) + for cert in summaries if cert[2].endswith(domains)))) + content = xml_to_string(feed, 'unicode', xml_declaration=True, + default_namespace=None).encode() + writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode()) + writer.write(content) + elif path in lookup: # accessible Atom entry's link/ID + writer.write(b'HTTP/1.1 200 OK\r\n') + writer.write(b'Content-Type: text/plain\r\n') + not_before, not_after = lookup[path] + content = (f'TLS certificate is valid' + f' from {not_before} until {not_after}\n').encode() + writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode()) + writer.write(content) + else: + writer.write(b'HTTP/1.1 404 Not Found\r\n') + await writer.drain() + writer.close() + await writer.wait_closed() + + +async def listen(certs, base_url, host, port): + """Serve HTTP server for TLS certificate expirations' Atom feed.""" + server = await start_server(partial(handle, certs, base_url), host, port) + async with server: + print('Serving on', end=' ') + print(*(socket.getsockname() for socket in server.sockets), sep=', ') + await server.serve_forever() |