diff options
author | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-04 15:41:28 +0900 |
---|---|---|
committer | Nguyễn Gia Phong <cnx@loang.net> | 2025-06-04 15:41:28 +0900 |
commit | 11d05505cdf25b77cfbdf09f5f1d1be79eeaa0f3 (patch) | |
tree | e0c19143ef56cdb856846a59d3f17ec323acd85a /src | |
parent | 10bffe228843ea4c59110fc9ce40663a7a144338 (diff) | |
download | scadere-11d05505cdf25b77cfbdf09f5f1d1be79eeaa0f3.tar.gz |
Handle server errors
Diffstat (limited to 'src')
-rw-r--r-- | src/scadere/listen.py | 220 |
1 files changed, 136 insertions, 84 deletions
diff --git a/src/scadere/listen.py b/src/scadere/listen.py index d8c1178..59dbef6 100644 --- a/src/scadere/listen.py +++ b/src/scadere/listen.py @@ -23,6 +23,7 @@ from datetime import datetime, timezone from functools import partial from http import HTTPStatus from pathlib import Path +from string import digits from typing import assert_never from urllib.parse import parse_qs, urljoin, urlsplit from xml.etree.ElementTree import (Element as xml_element, @@ -47,6 +48,18 @@ def path(hostname, port, issuer, serial): return f'{hostname}/{port}/{issuer}/{serial}' +def supported_http_version(version): + """Check if given HTTP version complies with section 2.5 of RFC 9110.""" + match len(version): + case 1: + return version in digits + case 3: + major, period, minor = version + return major in digits and period == '.' and minor in digits + case _: + return False + + def datetime_from_str(string, unavailable_ok=False): """Parse datetime from string in ISO 8601 format.""" if string == 'N/A' and unavailable_ok: @@ -54,9 +67,10 @@ def datetime_from_str(string, unavailable_ok=False): return datetime.fromisoformat(string) -async def write_status(writer, status): - """Write the given HTTP/1.1 status line.""" - writer.write(f'HTTP/1.1 {status.value} {status.phrase}\r\n'.encode()) +async def write_status(writer, http_version, status): + """Write the given HTTP status line.""" + status = f'HTTP/{http_version} {status.value} {status.phrase}\r\n' + writer.write(status.encode()) await writer.drain() @@ -66,9 +80,9 @@ async def write_content_type(writer, content_type): await writer.drain() -async def describe_status(writer, status): +async def describe_status(writer, status, http_version='1.1'): """Write a HTTP/1.1 response including status description.""" - await write_status(writer, status) + await write_status(writer, http_version, status) content = f'{status.description}\n'.encode() await write_content_type(writer, 'text/plain') writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode()) @@ -121,6 +135,52 @@ def entry(base_url, cert): ('updated', not_before)) +def split_domain(domain): + """Split domain and order by ascending level.""" + return tuple(domain.split('.')[::-1]) + + +def is_subdomain(subject, objects): + """Check if subject is a subdomain of any object.""" + if not objects: + return True + sbj_parts = split_domain(subject) + return any(sbj_parts[:len(obj_parts)] == obj_parts + for obj_parts in map(split_domain, objects)) + + +def feed(base_url, filename, certificates, domains): + """Construct an Atom feed based on the given information.""" + return ('feed', {'xmlns': 'http://www.w3.org/2005/Atom'}, + ('id', base_url), + ('link', {'rel': 'self', 'href': base_url}), + ('title', filename), + ('updated', datetime.now(tz=timezone.utc).isoformat()), + ('generator', + {'uri': 'https://trong.loang.net/scadere/about', + 'version': __version__}, + 'Scadere'), + *(entry(base_url, cert) for cert in certificates + if is_subdomain(cert[2], domains))) + + +def page(certificate): + """Construct an XHTML page for the given TLS certificate.""" + not_before, not_after, hostname, port, serial, issuer = certificate + return ('html', {'xmlns': 'http://www.w3.org/1999/xhtml', + 'lang': 'en'}, + ('head', + ('meta', {'name': 'color-scheme', + 'content': 'dark light'}), + ('meta', {'name': 'viewport', + 'content': ('width=device-width,' + 'initial-scale=1.0')}), + ('link', {'rel': 'icon', 'href': 'data:,'}), + ('title', f'TLS certificate - {hostname}:{port}')), + ('body', *body(not_before, not_after, + hostname, port, serial, issuer))) + + def xml(tree, parent=None): """Construct XML element from the given tree.""" tag, attrs, children = ((tree[0], tree[1], tree[2:]) @@ -138,96 +198,88 @@ def xml(tree, parent=None): elem.text = child case datetime(): elem.text = child.isoformat() - case _: # pragma: no cover + case _: assert_never(child) if parent is None: indent(elem) return elem -async def write_xml(writer, document): +async def write_xml(writer, http_version, application, func, *args): """Write given document as XML.""" - content = tuple(map(str.encode, - strings_from_xml(xml(document), 'unicode', - xml_declaration=True, - default_namespace=None))) - writer.write(f'Content-Length: {sum(map(len, content))}\r\n\r\n'.encode()) - for part in content: - writer.write(part) - await writer.drain() - - -def split_domain(domain): - """Split domain and order by ascending level.""" - return tuple(domain.split('.')[::-1]) - - -def is_subdomain(subject, objects): - """Check if subject is a subdomain of any object.""" - if not objects: - return True - sbj_parts = split_domain(subject) - return any(sbj_parts[:len(obj_parts)] == obj_parts - for obj_parts in map(split_domain, objects)) + try: + content = tuple(map(str.encode, + strings_from_xml(xml(func(*args)), 'unicode', + xml_declaration=True, + default_namespace=None))) + except Exception: # pragma: no cover + await describe_status(writer, HTTPStatus.INTERNAL_SERVER_ERROR, + http_version) + raise + else: + await write_status(writer, http_version, HTTPStatus.OK) + await write_content_type(writer, f'application/{application}+xml') + length = sum(map(len, content)) + writer.write(f'Content-Length: {length}\r\n\r\n'.encode()) + for part in content: + writer.write(part) + await writer.drain() async def handle(certs, base_url, reader, writer): """Handle HTTP request.""" - summaries = map(parse_summary, certs.read_text().splitlines()) - lookup = {urlsplit(urljoin(base_url, - path(hostname, port, issuer, serial))).path: - (datetime_from_str(not_before), - datetime_from_str(not_after, unavailable_ok=True), - hostname, port, serial, issuer) - for not_before, not_after, hostname, port, serial, issuer - in summaries} - request = await reader.readuntil(b'\r\n') - url = request.removeprefix(b'GET ').rsplit(b' HTTP/', 1)[0].strip() - url_parts = urlsplit(urljoin(base_url, url.decode())) - domains = tuple(parse_qs(url_parts.query).get('domain', [])) - - if not request.startswith(b'GET '): - await describe_status(writer, HTTPStatus.METHOD_NOT_ALLOWED) - elif url_parts.path == urlsplit(base_url).path: # Atom feed - await write_status(writer, HTTPStatus.OK) - await write_content_type(writer, 'application/atom+xml') - feed = ('feed', {'xmlns': 'http://www.w3.org/2005/Atom'}, - ('id', base_url), - ('link', {'rel': 'self', 'href': base_url}), - ('title', certs.name), - ('updated', datetime.now(tz=timezone.utc).isoformat()), - ('generator', - {'uri': 'https://trong.loang.net/scadere/about', - 'version': __version__}, - 'Scadere'), - *(entry(base_url, cert) for cert in lookup.values() - if is_subdomain(cert[2], domains))) - await write_xml(writer, feed) - elif url_parts.path in lookup: # accessible Atom entry's link/ID - await write_status(writer, HTTPStatus.OK) - await write_content_type(writer, 'application/xhtml+xml') - (not_before, not_after, - hostname, port, serial, issuer) = lookup[url_parts.path] - page = ('html', {'xmlns': 'http://www.w3.org/1999/xhtml', - 'lang': 'en'}, - ('head', - ('meta', {'name': 'color-scheme', - 'content': 'dark light'}), - ('meta', {'name': 'viewport', - 'content': ('width=device-width,' - 'initial-scale=1.0')}), - ('link', {'rel': 'icon', 'href': 'data:,'}), - ('title', f'TLS certificate - {hostname}:{port}')), - ('body', *body(not_before, not_after, - hostname, port, serial, issuer))) - await write_xml(writer, page) - else: - await describe_status(writer, HTTPStatus.NOT_FOUND) - - assert writer.can_write_eof() - writer.write_eof() - writer.close() - await writer.wait_closed() + try: + try: + request = await reader.readuntil(b'\r\n') + except Exception: + await describe_status(writer, HTTPStatus.BAD_REQUEST) + return + + if not request.startswith(b'GET '): + await describe_status(writer, HTTPStatus.METHOD_NOT_ALLOWED) + return + + try: + # Raise ValueError on the lack of b'HTTP/' + url, version = request.removeprefix(b'GET ').rsplit(b' HTTP/', 1) + http_version = version.strip().decode() + if not supported_http_version(http_version): + raise ValueError + except ValueError: + await describe_status(writer, + HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) + return + + try: + url_parts = urlsplit(urljoin(base_url, url.strip().decode())) + domains = tuple(parse_qs(url_parts.query).get('domain', [])) + summaries = map(parse_summary, certs.read_text().splitlines()) + lookup = {urlsplit(urljoin(base_url, + path(hostname, port, + issuer, serial))).path: + (datetime_from_str(not_before), + datetime_from_str(not_after, unavailable_ok=True), + hostname, port, serial, issuer) + for not_before, not_after, hostname, port, serial, issuer + in summaries} + except Exception: # pragma: no cover + await describe_status(writer, HTTPStatus.INTERNAL_SERVER_ERROR, + http_version) + raise + + if url_parts.path == urlsplit(base_url).path: # Atom feed + await write_xml(writer, http_version, 'atom', feed, + base_url, certs.name, lookup.values(), domains) + elif url_parts.path in lookup: # accessible Atom entry's link/ID + await write_xml(writer, http_version, 'xhtml', page, + lookup.get(url_parts.path)) + else: + await describe_status(writer, HTTPStatus.NOT_FOUND, http_version) + finally: + assert writer.can_write_eof() + writer.write_eof() + writer.close() + await writer.wait_closed() async def listen(certs, base_url, host, port): # pragma: no cover |