about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorNguyễn Gia Phong <cnx@loang.net>2025-02-07 11:36:02 +0900
committerNguyễn Gia Phong <cnx@loang.net>2025-02-07 11:47:41 +0900
commitc3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5 (patch)
tree1e7379e6efe8248ef9ffd225068b195b877c3d52 /src
downloadscadere-c3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5.tar.gz
Draft cert checker and feed server
Diffstat (limited to 'src')
-rw-r--r--src/scadere/__init__.py2
-rw-r--r--src/scadere/__main__.py111
-rw-r--r--src/scadere/check.py54
-rw-r--r--src/scadere/listen.py118
4 files changed, 285 insertions, 0 deletions
diff --git a/src/scadere/__init__.py b/src/scadere/__init__.py
new file mode 100644
index 0000000..2a73e5d
--- /dev/null
+++ b/src/scadere/__init__.py
@@ -0,0 +1,2 @@
+__all__ = ['__version__']
+__version__ = '0.0.1'
diff --git a/src/scadere/__main__.py b/src/scadere/__main__.py
new file mode 100644
index 0000000..b33d4d7
--- /dev/null
+++ b/src/scadere/__main__.py
@@ -0,0 +1,111 @@
+# Executable entry point
+# Copyright (C) 2025  Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from argparse import ArgumentParser, FileType, HelpFormatter
+from asyncio import run
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from sys import stdout
+
+from . import __version__
+from .check import check
+from .listen import listen
+
+
+class GNUHelpFormatter(HelpFormatter):
+    """Help formatter for ArgumentParser following GNU Coding Standards."""
+
+    def add_usage(self, usage, actions, groups, prefix='Usage: '):
+        """Substitute 'Usage:' for 'usage:'."""
+        super().add_usage(usage, actions, groups, prefix)
+
+    def start_section(self, heading):
+        """Substitute 'Options:' for 'options:'."""
+        super().start_section(heading.capitalize())
+
+    def _format_action_invocation(self, action):
+        """Format --long-option=argument."""
+        if not action.option_strings or action.nargs is not None:
+            return super()._format_action_invocation(action)
+        arg = self._format_args(action,
+                                self._get_default_metavar_for_optional(action))
+        return ', '.join(f"{opt}{'=' if opt.startswith('--') else ' '}{arg}"
+                         for opt in action.option_strings)
+
+    def add_argument(self, action):
+        """Suppress positional arguments."""
+        if action.option_strings:
+            super().add_argument(action)
+
+
+class NetLoc:
+    def __init__(self, default_port):
+        self.default_port = default_port
+
+    def __call__(self, string):
+        """Return hostname and port from given netloc."""
+        if ':' not in string:
+            return string, self.default_port
+        hostname, port = string.rsplit(':', 1)
+        return hostname, int(port)  # ValueError to be handled by argparse
+
+
+def main():
+    """Parse arguments and launch subprogram."""
+    parser = ArgumentParser(prog='scadere', allow_abbrev=False,
+                            formatter_class=GNUHelpFormatter)
+    parser.add_argument('-v', '--version', action='version',
+                        version=f'%(prog)s {__version__}')
+    subparsers = parser.add_subparsers(help='subcommand help', required=True)
+
+    check_description = ('Check TLS certificate expiration of HOST,'
+                         ' where PORT defaults to 443.')
+    check_parser = subparsers.add_parser('check',
+                                         description=check_description,
+                                         formatter_class=GNUHelpFormatter)
+    check_parser.set_defaults(subcommand='check')
+    check_parser.add_argument('netloc', metavar='HOST[:PORT]',
+                              nargs='+', type=NetLoc(443))
+    check_parser.add_argument('-d', '--days', type=float, default=7,
+                              help='days before expiration (default to 7)')
+    check_parser.add_argument('-o', '--output', metavar='PATH',
+                              type=FileType('w'), default=stdout,
+                              help='output file (default to stdout)')
+
+    listen_description = ('Serve the TLS certificate expiration feed'
+                          ' from INPUT file for base URL at HOST:PORT,'
+                          ' where HOST defaults to localhost and PORT'
+                          ' is selected randomly if not specified.')
+    listen_parser = subparsers.add_parser('listen',
+                                          description=listen_description,
+                                          formatter_class=GNUHelpFormatter)
+    listen_parser.add_argument('certs', metavar='INPUT', type=Path)
+    listen_parser.add_argument('base_url', metavar='URL')
+    listen_parser.add_argument('netloc', metavar='HOST[:PORT]', nargs='?',
+                               type=NetLoc(None), default=('localhost', None))
+    listen_parser.set_defaults(subcommand='listen')
+
+    args = parser.parse_args()
+    if args.subcommand == 'check':
+        with args.output:
+            after = datetime.now(tz=timezone.utc) + timedelta(days=args.days)
+            check(args.netloc, after, args.output)
+    elif args.subcommand == 'listen':
+        run(listen(args.certs, args.base_url, *args.netloc))
+
+
+if __name__ == '__main__':
+    main()
diff --git a/src/scadere/check.py b/src/scadere/check.py
new file mode 100644
index 0000000..ee230bb
--- /dev/null
+++ b/src/scadere/check.py
@@ -0,0 +1,54 @@
+# TLS certificate expiration checker
+# Copyright (C) 2025  Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from email.utils import parsedate_to_datetime as parsedate
+from itertools import chain
+from socket import AF_INET, socket
+from ssl import create_default_context as tls_context
+from sys import stderr
+
+__all__ = ['check']
+
+
+def check(netlocs, after, output):
+    """Check if each netloc's TLS certificate expires after given time.
+
+    Print the certificate's summary to output file if that is the case.
+    """
+    ctx = tls_context()
+    for hostname, port in netlocs:
+        netloc = f'{hostname}:{port}'
+        stderr.write(f'TLS certificate for {netloc} ')
+        try:
+            with ctx.wrap_socket(socket(AF_INET),
+                                 server_hostname=hostname) as conn:
+                conn.connect((hostname, port))
+                cert = conn.getpeercert()
+        except Exception as e:
+            stderr.write(f'cannot be retrieved: {e}\n')
+        else:
+            ca = dict(chain.from_iterable(cert['issuer']))['organizationName']
+            not_before = parsedate(cert['notBefore'])
+            not_after = parsedate(cert['notAfter'])
+            if after < not_after:
+                after_seconds = after.isoformat(timespec='seconds')
+                stderr.write(f'will not expire at {after_seconds}\n')
+            else:
+                stderr.write(f'will expire at {not_after.isoformat()}\n')
+                print(not_before.isoformat(), not_after.isoformat(),
+                      # As unique identifier
+                      hostname, port, cert['serialNumber'], ca,
+                      file=output)
diff --git a/src/scadere/listen.py b/src/scadere/listen.py
new file mode 100644
index 0000000..5576986
--- /dev/null
+++ b/src/scadere/listen.py
@@ -0,0 +1,118 @@
+# HTTP server for Atom feed of TLS certificate expirations
+# Copyright (C) 2025  Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from asyncio import start_server
+from datetime import datetime
+from functools import partial
+from urllib.parse import parse_qs, quote, urljoin, urlsplit
+from xml.etree.ElementTree import (Element as xml_element,
+                                   SubElement as xml_subelement,
+                                   indent, tostring as xml_to_string)
+
+__all__ = ['listen']
+
+
+def entry(base_url, cert):
+    """Construct Atom entry for the given TLS certificate."""
+    not_before, not_after, hostname, port, serial, issuer = cert
+    url = urljoin(base_url, quote(f'{hostname}/{port}/{issuer}/{serial}'))
+    return ('entry',
+            ('author', ('name', issuer)),
+            ('content', {'type': 'text'},
+             (f'TLS certificate for {hostname}:{port}'
+              f' issued by {issuer} will expire at {not_after}')),
+            ('id', url),
+            ('link', {'rel': 'alternate', 'type': 'text/plain', 'href': url}),
+            ('title', (f'TLS certificate for {hostname}:{port}'
+                       f' will expire at {not_after}')),
+            ('updated', not_before))
+
+
+def xml(tree, parent=None):
+    """Construct XML element from the given tree."""
+    tag, attrs, children = ((tree[0], tree[1], tree[2:])
+                            if isinstance(tree[1], dict)
+                            else (tree[0], {}, tree[1:]))
+    if parent is None:
+        elem = xml_element(tag, attrs)
+    else:
+        elem = xml_subelement(parent, tag, attrs)
+    for child in children:
+        if isinstance(child, str):
+            elem.text = child
+        else:
+            xml(child, elem)
+    if parent is None:
+        indent(elem)
+    return elem
+
+
+async def handle(certs, base_url, reader, writer):
+    """Handle HTTP request."""
+    summaries = tuple(cert.rstrip().split(maxsplit=5)
+                      for cert in certs.read_text().splitlines())
+    lookup = {quote(f'/{hostname}/{port}/{issuer}/{serial}'):
+              (not_before, not_after)
+              for not_before, not_after, hostname, port, serial, issuer
+              in summaries}
+    request = await reader.readuntil(b'\r\n')
+    url = request.removeprefix(b'GET ').rsplit(b' HTTP/', 1)[0]
+    url_parts = urlsplit(url.decode())
+    path = url_parts.path
+    domains = tuple(parse_qs(url_parts.query).get('domain', ['']))
+
+    if not request.startswith(b'GET '):
+        writer.write(b'HTTP/1.1 405 Method Not Allowed\r\n')
+        await writer.drain()
+        writer.close()
+        await writer.wait_closed()
+        return
+    elif path == '/':  # Atom feed
+        writer.write(b'HTTP/1.1 200 OK\r\n')
+        writer.write(b'Content-Type: application/atom+xml\r\n')
+        feed = xml(('feed', {'xmlns': 'http://www.w3.org/2005/Atom'},
+                    ('id', base_url),
+                    ('link', {'rel': 'self', 'href': base_url}),
+                    ('title', certs.name),
+                    ('updated', datetime.now().isoformat()),
+                    *(entry(base_url, cert)
+                      for cert in summaries if cert[2].endswith(domains))))
+        content = xml_to_string(feed, 'unicode', xml_declaration=True,
+                                default_namespace=None).encode()
+        writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode())
+        writer.write(content)
+    elif path in lookup:  # accessible Atom entry's link/ID
+        writer.write(b'HTTP/1.1 200 OK\r\n')
+        writer.write(b'Content-Type: text/plain\r\n')
+        not_before, not_after = lookup[path]
+        content = (f'TLS certificate is valid'
+                   f' from {not_before} until {not_after}\n').encode()
+        writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode())
+        writer.write(content)
+    else:
+        writer.write(b'HTTP/1.1 404 Not Found\r\n')
+    await writer.drain()
+    writer.close()
+    await writer.wait_closed()
+
+
+async def listen(certs, base_url, host, port):
+    """Serve HTTP server for TLS certificate expirations' Atom feed."""
+    server = await start_server(partial(handle, certs, base_url), host, port)
+    async with server:
+        print('Serving on', end=' ')
+        print(*(socket.getsockname() for socket in server.sockets), sep=', ')
+        await server.serve_forever()