aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNguyễn Gia Phong <cnx@loang.net>2025-02-07 11:36:02 +0900
committerNguyễn Gia Phong <cnx@loang.net>2025-02-07 11:47:41 +0900
commitc3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5 (patch)
tree1e7379e6efe8248ef9ffd225068b195b877c3d52 /src
downloadscadere-c3ec223b35e71a6d44e5ec01c55d4aa9746bc3a5.tar.gz
Draft cert checker and feed server
Diffstat (limited to 'src')
-rw-r--r--src/scadere/__init__.py2
-rw-r--r--src/scadere/__main__.py111
-rw-r--r--src/scadere/check.py54
-rw-r--r--src/scadere/listen.py118
4 files changed, 285 insertions, 0 deletions
diff --git a/src/scadere/__init__.py b/src/scadere/__init__.py
new file mode 100644
index 0000000..2a73e5d
--- /dev/null
+++ b/src/scadere/__init__.py
@@ -0,0 +1,2 @@
+__all__ = ['__version__']
+__version__ = '0.0.1'
diff --git a/src/scadere/__main__.py b/src/scadere/__main__.py
new file mode 100644
index 0000000..b33d4d7
--- /dev/null
+++ b/src/scadere/__main__.py
@@ -0,0 +1,111 @@
+# Executable entry point
+# Copyright (C) 2025 Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from argparse import ArgumentParser, FileType, HelpFormatter
+from asyncio import run
+from datetime import datetime, timedelta, timezone
+from pathlib import Path
+from sys import stdout
+
+from . import __version__
+from .check import check
+from .listen import listen
+
+
+class GNUHelpFormatter(HelpFormatter):
+ """Help formatter for ArgumentParser following GNU Coding Standards."""
+
+ def add_usage(self, usage, actions, groups, prefix='Usage: '):
+ """Substitute 'Usage:' for 'usage:'."""
+ super().add_usage(usage, actions, groups, prefix)
+
+ def start_section(self, heading):
+ """Substitute 'Options:' for 'options:'."""
+ super().start_section(heading.capitalize())
+
+ def _format_action_invocation(self, action):
+ """Format --long-option=argument."""
+ if not action.option_strings or action.nargs is not None:
+ return super()._format_action_invocation(action)
+ arg = self._format_args(action,
+ self._get_default_metavar_for_optional(action))
+ return ', '.join(f"{opt}{'=' if opt.startswith('--') else ' '}{arg}"
+ for opt in action.option_strings)
+
+ def add_argument(self, action):
+ """Suppress positional arguments."""
+ if action.option_strings:
+ super().add_argument(action)
+
+
+class NetLoc:
+ def __init__(self, default_port):
+ self.default_port = default_port
+
+ def __call__(self, string):
+ """Return hostname and port from given netloc."""
+ if ':' not in string:
+ return string, self.default_port
+ hostname, port = string.rsplit(':', 1)
+ return hostname, int(port) # ValueError to be handled by argparse
+
+
+def main():
+ """Parse arguments and launch subprogram."""
+ parser = ArgumentParser(prog='scadere', allow_abbrev=False,
+ formatter_class=GNUHelpFormatter)
+ parser.add_argument('-v', '--version', action='version',
+ version=f'%(prog)s {__version__}')
+ subparsers = parser.add_subparsers(help='subcommand help', required=True)
+
+ check_description = ('Check TLS certificate expiration of HOST,'
+ ' where PORT defaults to 443.')
+ check_parser = subparsers.add_parser('check',
+ description=check_description,
+ formatter_class=GNUHelpFormatter)
+ check_parser.set_defaults(subcommand='check')
+ check_parser.add_argument('netloc', metavar='HOST[:PORT]',
+ nargs='+', type=NetLoc(443))
+ check_parser.add_argument('-d', '--days', type=float, default=7,
+ help='days before expiration (default to 7)')
+ check_parser.add_argument('-o', '--output', metavar='PATH',
+ type=FileType('w'), default=stdout,
+ help='output file (default to stdout)')
+
+ listen_description = ('Serve the TLS certificate expiration feed'
+ ' from INPUT file for base URL at HOST:PORT,'
+ ' where HOST defaults to localhost and PORT'
+ ' is selected randomly if not specified.')
+ listen_parser = subparsers.add_parser('listen',
+ description=listen_description,
+ formatter_class=GNUHelpFormatter)
+ listen_parser.add_argument('certs', metavar='INPUT', type=Path)
+ listen_parser.add_argument('base_url', metavar='URL')
+ listen_parser.add_argument('netloc', metavar='HOST[:PORT]', nargs='?',
+ type=NetLoc(None), default=('localhost', None))
+ listen_parser.set_defaults(subcommand='listen')
+
+ args = parser.parse_args()
+ if args.subcommand == 'check':
+ with args.output:
+ after = datetime.now(tz=timezone.utc) + timedelta(days=args.days)
+ check(args.netloc, after, args.output)
+ elif args.subcommand == 'listen':
+ run(listen(args.certs, args.base_url, *args.netloc))
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/scadere/check.py b/src/scadere/check.py
new file mode 100644
index 0000000..ee230bb
--- /dev/null
+++ b/src/scadere/check.py
@@ -0,0 +1,54 @@
+# TLS certificate expiration checker
+# Copyright (C) 2025 Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from email.utils import parsedate_to_datetime as parsedate
+from itertools import chain
+from socket import AF_INET, socket
+from ssl import create_default_context as tls_context
+from sys import stderr
+
+__all__ = ['check']
+
+
+def check(netlocs, after, output):
+ """Check if each netloc's TLS certificate expires after given time.
+
+ Print the certificate's summary to output file if that is the case.
+ """
+ ctx = tls_context()
+ for hostname, port in netlocs:
+ netloc = f'{hostname}:{port}'
+ stderr.write(f'TLS certificate for {netloc} ')
+ try:
+ with ctx.wrap_socket(socket(AF_INET),
+ server_hostname=hostname) as conn:
+ conn.connect((hostname, port))
+ cert = conn.getpeercert()
+ except Exception as e:
+ stderr.write(f'cannot be retrieved: {e}\n')
+ else:
+ ca = dict(chain.from_iterable(cert['issuer']))['organizationName']
+ not_before = parsedate(cert['notBefore'])
+ not_after = parsedate(cert['notAfter'])
+ if after < not_after:
+ after_seconds = after.isoformat(timespec='seconds')
+ stderr.write(f'will not expire at {after_seconds}\n')
+ else:
+ stderr.write(f'will expire at {not_after.isoformat()}\n')
+ print(not_before.isoformat(), not_after.isoformat(),
+ # As unique identifier
+ hostname, port, cert['serialNumber'], ca,
+ file=output)
diff --git a/src/scadere/listen.py b/src/scadere/listen.py
new file mode 100644
index 0000000..5576986
--- /dev/null
+++ b/src/scadere/listen.py
@@ -0,0 +1,118 @@
+# HTTP server for Atom feed of TLS certificate expirations
+# Copyright (C) 2025 Nguyễn Gia Phong
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+from asyncio import start_server
+from datetime import datetime
+from functools import partial
+from urllib.parse import parse_qs, quote, urljoin, urlsplit
+from xml.etree.ElementTree import (Element as xml_element,
+ SubElement as xml_subelement,
+ indent, tostring as xml_to_string)
+
+__all__ = ['listen']
+
+
+def entry(base_url, cert):
+ """Construct Atom entry for the given TLS certificate."""
+ not_before, not_after, hostname, port, serial, issuer = cert
+ url = urljoin(base_url, quote(f'{hostname}/{port}/{issuer}/{serial}'))
+ return ('entry',
+ ('author', ('name', issuer)),
+ ('content', {'type': 'text'},
+ (f'TLS certificate for {hostname}:{port}'
+ f' issued by {issuer} will expire at {not_after}')),
+ ('id', url),
+ ('link', {'rel': 'alternate', 'type': 'text/plain', 'href': url}),
+ ('title', (f'TLS certificate for {hostname}:{port}'
+ f' will expire at {not_after}')),
+ ('updated', not_before))
+
+
+def xml(tree, parent=None):
+ """Construct XML element from the given tree."""
+ tag, attrs, children = ((tree[0], tree[1], tree[2:])
+ if isinstance(tree[1], dict)
+ else (tree[0], {}, tree[1:]))
+ if parent is None:
+ elem = xml_element(tag, attrs)
+ else:
+ elem = xml_subelement(parent, tag, attrs)
+ for child in children:
+ if isinstance(child, str):
+ elem.text = child
+ else:
+ xml(child, elem)
+ if parent is None:
+ indent(elem)
+ return elem
+
+
+async def handle(certs, base_url, reader, writer):
+ """Handle HTTP request."""
+ summaries = tuple(cert.rstrip().split(maxsplit=5)
+ for cert in certs.read_text().splitlines())
+ lookup = {quote(f'/{hostname}/{port}/{issuer}/{serial}'):
+ (not_before, not_after)
+ for not_before, not_after, hostname, port, serial, issuer
+ in summaries}
+ request = await reader.readuntil(b'\r\n')
+ url = request.removeprefix(b'GET ').rsplit(b' HTTP/', 1)[0]
+ url_parts = urlsplit(url.decode())
+ path = url_parts.path
+ domains = tuple(parse_qs(url_parts.query).get('domain', ['']))
+
+ if not request.startswith(b'GET '):
+ writer.write(b'HTTP/1.1 405 Method Not Allowed\r\n')
+ await writer.drain()
+ writer.close()
+ await writer.wait_closed()
+ return
+ elif path == '/': # Atom feed
+ writer.write(b'HTTP/1.1 200 OK\r\n')
+ writer.write(b'Content-Type: application/atom+xml\r\n')
+ feed = xml(('feed', {'xmlns': 'http://www.w3.org/2005/Atom'},
+ ('id', base_url),
+ ('link', {'rel': 'self', 'href': base_url}),
+ ('title', certs.name),
+ ('updated', datetime.now().isoformat()),
+ *(entry(base_url, cert)
+ for cert in summaries if cert[2].endswith(domains))))
+ content = xml_to_string(feed, 'unicode', xml_declaration=True,
+ default_namespace=None).encode()
+ writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode())
+ writer.write(content)
+ elif path in lookup: # accessible Atom entry's link/ID
+ writer.write(b'HTTP/1.1 200 OK\r\n')
+ writer.write(b'Content-Type: text/plain\r\n')
+ not_before, not_after = lookup[path]
+ content = (f'TLS certificate is valid'
+ f' from {not_before} until {not_after}\n').encode()
+ writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode())
+ writer.write(content)
+ else:
+ writer.write(b'HTTP/1.1 404 Not Found\r\n')
+ await writer.drain()
+ writer.close()
+ await writer.wait_closed()
+
+
+async def listen(certs, base_url, host, port):
+ """Serve HTTP server for TLS certificate expirations' Atom feed."""
+ server = await start_server(partial(handle, certs, base_url), host, port)
+ async with server:
+ print('Serving on', end=' ')
+ print(*(socket.getsockname() for socket in server.sockets), sep=', ')
+ await server.serve_forever()