diff options
author | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-04 01:30:57 +0900 |
---|---|---|
committer | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-04 01:30:57 +0900 |
commit | cb1b5c48145f9c23eae95f922c6af804466bcb42 (patch) | |
tree | 24859b4e1642753e7756975cae28e4d24f9847a3 /src | |
parent | 8b83c1f04c808558a8097022466b2d4327dd62af (diff) | |
download | scadere-cb1b5c48145f9c23eae95f922c6af804466bcb42.tar.gz |
Handle summaries of certs not retrieved
Diffstat (limited to 'src')
-rw-r--r-- | src/scadere/check.py | 23 | ||||
-rw-r--r-- | src/scadere/listen.py | 49 |
2 files changed, 56 insertions, 16 deletions
diff --git a/src/scadere/check.py b/src/scadere/check.py index 23ba189..aaabe3f 100644 --- a/src/scadere/check.py +++ b/src/scadere/check.py @@ -24,15 +24,25 @@ from itertools import chain from socket import AF_INET, socket from ssl import create_default_context as tls_context from sys import argv, stderr, stdout +from unicodedata import category as unicode_category from . import __version__, GNUHelpFormatter, NetLoc __all__ = ['main'] +class CtlChrTrans: + """Translator for printing Unicode control characters.""" + + def __getitem__(self, ordinal): + if unicode_category(chr(ordinal)) == 'Cc': + return 0xfffd # replacement character '�' + raise KeyError + + def base64_from_str(string): """Convert string to base64 format in bytes.""" - return base64(string.encode()).decode() + return base64(string.translate(CtlChrTrans()).encode()).decode() def check(netlocs, after, output, fake_ca=None): @@ -52,9 +62,11 @@ def check(netlocs, after, output, fake_ca=None): server_hostname=hostname) as conn: conn.connect((hostname, port)) cert = conn.getpeercert() - except Exception as e: - stderr.write(f'cannot be retrieved: {e}\n') - print(f'N/A N/A {hostname} {port} N/A {e}', file=output) + except Exception as exception: + stderr.write(f'cannot be retrieved: {exception}\n') + now = datetime.now(tz=timezone.utc).isoformat() + print(now, 'N/A', hostname, port, 'N/A', + base64_from_str(str(exception)), file=output) else: ca = dict(chain.from_iterable(cert['issuer']))['organizationName'] not_before = parsedate(cert['notBefore']) @@ -64,9 +76,10 @@ def check(netlocs, after, output, fake_ca=None): stderr.write(f'will not expire at {after_seconds}\n') else: stderr.write(f'will expire at {not_after.isoformat()}\n') + serial = cert['serialNumber'].translate(CtlChrTrans()) print(not_before.isoformat(), not_after.isoformat(), # As unique identifier - hostname, port, cert['serialNumber'], + hostname, port, serial, base64_from_str(ca), file=output) diff --git a/src/scadere/listen.py b/src/scadere/listen.py index bf179e6..d8c1178 100644 --- a/src/scadere/listen.py +++ b/src/scadere/listen.py @@ -23,6 +23,7 @@ from datetime import datetime, timezone from functools import partial from http import HTTPStatus from pathlib import Path +from typing import assert_never from urllib.parse import parse_qs, urljoin, urlsplit from xml.etree.ElementTree import (Element as xml_element, SubElement as xml_subelement, @@ -41,9 +42,18 @@ def parse_summary(line): def path(hostname, port, issuer, serial): """Return the relative URL for the given certificate's details.""" + if serial == 'N/A': + return f'{hostname}/{port}' return f'{hostname}/{port}/{issuer}/{serial}' +def datetime_from_str(string, unavailable_ok=False): + """Parse datetime from string in ISO 8601 format.""" + if string == 'N/A' and unavailable_ok: + return None + return datetime.fromisoformat(string) + + async def write_status(writer, status): """Write the given HTTP/1.1 status line.""" writer.write(f'HTTP/1.1 {status.value} {status.phrase}\r\n'.encode()) @@ -71,13 +81,21 @@ def str_from_base64(string): return from_base64(string.encode()).decode() -def body(not_before, not_after, hostname, port, serial, issuer): +def body(not_before, not_after, hostname, port, serial, string64): """Describe the given certificate in XHTML.""" + string = str_from_base64(string64) + if not_after is None: + return (('h1', 'TLS certificate problem'), + ('dl', + ('dt', 'Domain'), ('dd', hostname), + ('dt', 'Port'), ('dd', port), + ('dt', 'Time'), ('dd', not_before), + ('dt', 'Error'), ('dd', string))) return (('h1', 'TLS certificate information'), ('dl', ('dt', 'Domain'), ('dd', hostname), ('dt', 'Port'), ('dd', port), - ('dt', 'Issuer'), ('dd', str_from_base64(issuer)), + ('dt', 'Issuer'), ('dd', string), ('dt', 'Serial number'), ('dd', serial), ('dt', 'Valid from'), ('dd', not_before), ('dt', 'Valid until'), ('dd', not_after))) @@ -87,15 +105,19 @@ def entry(base_url, cert): """Construct Atom entry for the given TLS certificate.""" not_before, not_after, hostname, port, serial, issuer = cert url = urljoin(base_url, path(hostname, port, issuer, serial)) + title = (f'TLS cert for {hostname} cannot be retrieved' + if not_after is None + else f'TLS cert for {hostname} will expire at {not_after}') + author = 'Scadere' if not_after is None else str_from_base64(issuer) return ('entry', - ('author', ('name', str_from_base64(issuer))), + ('author', ('name', author)), ('content', {'type': 'xhtml'}, ('div', {'xmlns': 'http://www.w3.org/1999/xhtml'}, *body(*cert))), ('id', url), ('link', {'rel': 'alternate', 'type': 'application/xhtml+xml', 'href': url}), - ('title', (f'TLS cert for {hostname} will expire at {not_after}')), + ('title', title), ('updated', not_before)) @@ -109,12 +131,15 @@ def xml(tree, parent=None): else: elem = xml_subelement(parent, tag, attrs) for child in children: - if isinstance(child, tuple): - xml(child, elem) - elif isinstance(child, datetime): - elem.text = child.isoformat() - else: - elem.text = str(child) + match child: + case tuple(): + xml(child, elem) + case str(): + elem.text = child + case datetime(): + elem.text = child.isoformat() + case _: # pragma: no cover + assert_never(child) if parent is None: indent(elem) return elem @@ -151,7 +176,9 @@ async def handle(certs, base_url, reader, writer): summaries = map(parse_summary, certs.read_text().splitlines()) lookup = {urlsplit(urljoin(base_url, path(hostname, port, issuer, serial))).path: - (not_before, not_after, hostname, port, serial, issuer) + (datetime_from_str(not_before), + datetime_from_str(not_after, unavailable_ok=True), + hostname, port, serial, issuer) for not_before, not_after, hostname, port, serial, issuer in summaries} request = await reader.readuntil(b'\r\n') |