diff options
author | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-05 17:09:25 +0900 |
---|---|---|
committer | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-05 17:09:25 +0900 |
commit | 9508ef9fa8ae23610048bff9f03d592c3d1855fb (patch) | |
tree | d5c748b5ae08b355d21645cf5cb6b2d762707dc9 /src | |
parent | 8021b9db75807fb89f5bc2497cbf309ac606f18d (diff) | |
download | scadere-9508ef9fa8ae23610048bff9f03d592c3d1855fb.tar.gz |
Strengthen typing
Diffstat (limited to 'src')
-rw-r--r-- | src/scadere/check.py | 9 | ||||
-rw-r--r-- | src/scadere/listen.py | 79 |
2 files changed, 44 insertions, 44 deletions
diff --git a/src/scadere/check.py b/src/scadere/check.py index 288e599..0937e75 100644 --- a/src/scadere/check.py +++ b/src/scadere/check.py @@ -25,6 +25,7 @@ from socket import AF_INET, socket from ssl import create_default_context as tls_context from sys import argv, stderr, stdout from unicodedata import category as unicode_category +from uuid import uuid4 from . import __version__, GNUHelpFormatter, NetLoc @@ -56,7 +57,7 @@ def check(netlocs, after, output, fake_ca=None): fake_ca.configure_trust(ctx) for hostname, port in netlocs: - now = datetime.now(tz=timezone.utc).isoformat(timespec='seconds') + now = datetime.now(timezone.utc).isoformat() netloc = f'{hostname}:{port}' stderr.write(f'TLS certificate for {netloc} ') try: @@ -66,7 +67,7 @@ def check(netlocs, after, output, fake_ca=None): cert = conn.getpeercert() except Exception as exception: stderr.write(f'cannot be retrieved: {exception}\n') - print(now, 'N/A', hostname, port, 'N/A', + print('N/A', now, hostname, port, uuid4().int, base64_from_str(str(exception)), file=output) continue @@ -79,7 +80,7 @@ def check(netlocs, after, output, fake_ca=None): serial = int(cert['serialNumber'], 16) except Exception as exception: stderr.write(f'cannot be parsed: {exception}\n') - print(now, 'N/A', hostname, port, 'N/A', + print('N/A', now, hostname, port, uuid4().int, base64_from_str(str(exception)), file=output) else: if after < not_after: @@ -111,7 +112,7 @@ def main(arguments=argv[1:]): help='output file (default to stdout)') args = parser.parse_args(arguments) with args.output: # pragma: no cover - after = datetime.now(tz=timezone.utc) + timedelta(days=args.days) + after = datetime.now(timezone.utc) + timedelta(days=args.days) check(args.netloc, after, args.output) diff --git a/src/scadere/listen.py b/src/scadere/listen.py index 59dbef6..add4b05 100644 --- a/src/scadere/listen.py +++ b/src/scadere/listen.py @@ -22,6 +22,7 @@ from base64 import urlsafe_b64decode as from_base64 from datetime import datetime, timezone from functools import partial from http import HTTPStatus +from operator import call from pathlib import Path from string import digits from typing import assert_never @@ -32,20 +33,34 @@ from xml.etree.ElementTree import (Element as xml_element, from sys import argv from . import __version__, GNUHelpFormatter, NetLoc +from .check import base64_from_str __all__ = ['main'] +def datetime_from_str(string, unavailable_ok=False): + """Parse datetime from string in ISO 8601 format.""" + if string == 'N/A' and unavailable_ok: + return None + return datetime.fromisoformat(string) + + +def str_from_base64(string64): + """Decode string in base64 format.""" + return from_base64(string64.encode()).decode() + + def parse_summary(line): """Parse TLS certificate into a summary tuple.""" - return tuple(line.rstrip('\r\n').split(' ', maxsplit=5)) + return tuple(map(call, + (partial(datetime_from_str, unavailable_ok=True), + datetime_from_str, str, int, int, str_from_base64), + line.rstrip('\r\n').split(' ', maxsplit=5))) -def path(hostname, port, issuer, serial): +def path(hostname, port, number, string): """Return the relative URL for the given certificate's details.""" - if serial == 'N/A': - return f'{hostname}/{port}' - return f'{hostname}/{port}/{issuer}/{serial}' + return f'{hostname}/{port}/{base64_from_str(string)}/{number}' def supported_http_version(version): @@ -60,13 +75,6 @@ def supported_http_version(version): return False -def datetime_from_str(string, unavailable_ok=False): - """Parse datetime from string in ISO 8601 format.""" - if string == 'N/A' and unavailable_ok: - return None - return datetime.fromisoformat(string) - - async def write_status(writer, http_version, status): """Write the given HTTP status line.""" status = f'HTTP/{http_version} {status.value} {status.phrase}\r\n' @@ -90,39 +98,33 @@ async def describe_status(writer, status, http_version='1.1'): await writer.drain() -def str_from_base64(string): - """Decode string in base64 format.""" - return from_base64(string.encode()).decode() - - -def body(not_before, not_after, hostname, port, serial, string64): +def body(not_before, not_after, hostname, port, number, string): """Describe the given certificate in XHTML.""" - string = str_from_base64(string64) - if not_after is None: + if not_before is None: return (('h1', 'TLS certificate problem'), ('dl', ('dt', 'Domain'), ('dd', hostname), ('dt', 'Port'), ('dd', port), - ('dt', 'Time'), ('dd', not_before), + ('dt', 'Time'), ('dd', not_after), ('dt', 'Error'), ('dd', string))) return (('h1', 'TLS certificate information'), ('dl', ('dt', 'Domain'), ('dd', hostname), ('dt', 'Port'), ('dd', port), ('dt', 'Issuer'), ('dd', string), - ('dt', 'Serial number'), ('dd', serial), + ('dt', 'Serial number'), ('dd', number), ('dt', 'Valid from'), ('dd', not_before), ('dt', 'Valid until'), ('dd', not_after))) def entry(base_url, cert): """Construct Atom entry for the given TLS certificate.""" - not_before, not_after, hostname, port, serial, issuer = cert - url = urljoin(base_url, path(hostname, port, issuer, serial)) + not_before, not_after, hostname, port, number, string = cert + url = urljoin(base_url, path(hostname, port, number, string)) title = (f'TLS cert for {hostname} cannot be retrieved' - if not_after is None + if not_before is None else f'TLS cert for {hostname} will expire at {not_after}') - author = 'Scadere' if not_after is None else str_from_base64(issuer) + author = 'Scadere' if not_before is None else string return ('entry', ('author', ('name', author)), ('content', {'type': 'xhtml'}, @@ -132,7 +134,7 @@ def entry(base_url, cert): 'type': 'application/xhtml+xml', 'href': url}), ('title', title), - ('updated', not_before)) + ('updated', not_after)) def split_domain(domain): @@ -155,7 +157,7 @@ def feed(base_url, filename, certificates, domains): ('id', base_url), ('link', {'rel': 'self', 'href': base_url}), ('title', filename), - ('updated', datetime.now(tz=timezone.utc).isoformat()), + ('updated', datetime.now(timezone.utc).isoformat()), ('generator', {'uri': 'https://trong.loang.net/scadere/about', 'version': __version__}, @@ -166,7 +168,7 @@ def feed(base_url, filename, certificates, domains): def page(certificate): """Construct an XHTML page for the given TLS certificate.""" - not_before, not_after, hostname, port, serial, issuer = certificate + hostname, port = certificate[2:4] return ('html', {'xmlns': 'http://www.w3.org/1999/xhtml', 'lang': 'en'}, ('head', @@ -177,8 +179,7 @@ def page(certificate): 'initial-scale=1.0')}), ('link', {'rel': 'icon', 'href': 'data:,'}), ('title', f'TLS certificate - {hostname}:{port}')), - ('body', *body(not_before, not_after, - hostname, port, serial, issuer))) + ('body', *body(*certificate))) def xml(tree, parent=None): @@ -196,6 +197,8 @@ def xml(tree, parent=None): xml(child, elem) case str(): elem.text = child + case int(): + elem.text = str(child) case datetime(): elem.text = child.isoformat() case _: @@ -251,17 +254,13 @@ async def handle(certs, base_url, reader, writer): return try: + summaries = tuple(map(parse_summary, + certs.read_text().splitlines())) + paths = tuple(urlsplit(urljoin(base_url, path(*s[-4:]))).path + for s in summaries) + lookup = dict(map(tuple, zip(paths, summaries))) url_parts = urlsplit(urljoin(base_url, url.strip().decode())) domains = tuple(parse_qs(url_parts.query).get('domain', [])) - summaries = map(parse_summary, certs.read_text().splitlines()) - lookup = {urlsplit(urljoin(base_url, - path(hostname, port, - issuer, serial))).path: - (datetime_from_str(not_before), - datetime_from_str(not_after, unavailable_ok=True), - hostname, port, serial, issuer) - for not_before, not_after, hostname, port, serial, issuer - in summaries} except Exception: # pragma: no cover await describe_status(writer, HTTPStatus.INTERNAL_SERVER_ERROR, http_version) |