diff options
-rw-r--r-- | src/scadere/check.py | 4 | ||||
-rw-r--r-- | src/scadere/listen.py | 47 | ||||
-rw-r--r-- | tst/test_check.py | 6 | ||||
-rw-r--r-- | tst/test_listen.py | 12 |
4 files changed, 40 insertions, 29 deletions
diff --git a/src/scadere/check.py b/src/scadere/check.py index 8c3e60b..97b98b2 100644 --- a/src/scadere/check.py +++ b/src/scadere/check.py @@ -55,7 +55,7 @@ def check(netlocs, after, output, fake_ca=None): cert = conn.getpeercert() except Exception as exception: stderr.write(f'cannot be retrieved: {exception}\n') - print('N/A', now, hostname, port, uuid4().int, + print(now, 'N/A', hostname, port, uuid4().int, base64_from_str(str(exception)), file=output) continue @@ -68,7 +68,7 @@ def check(netlocs, after, output, fake_ca=None): serial = int(cert['serialNumber'], 16) except Exception as exception: stderr.write(f'cannot be parsed: {exception}\n') - print('N/A', now, hostname, port, uuid4().int, + print(now, 'N/A', hostname, port, uuid4().int, base64_from_str(str(exception)), file=output) else: if after < not_after: diff --git a/src/scadere/listen.py b/src/scadere/listen.py index cc08cd1..151a108 100644 --- a/src/scadere/listen.py +++ b/src/scadere/listen.py @@ -9,16 +9,17 @@ from base64 import urlsafe_b64decode as from_base64 from datetime import datetime, timezone from functools import lru_cache, partial from http import HTTPStatus +from locale import LC_TIME, setlocale from operator import call from os.path import basename from pathlib import Path from string import digits +from sys import argv from typing import assert_never from urllib.parse import parse_qs, urljoin, urlsplit from xml.etree.ElementTree import (Element as xml_element, SubElement as xml_subelement, indent, tostring as string_from_xml) -from sys import argv from . import (__version__, GNUHelpFormatter, NetLoc, format_epilog, format_version) @@ -37,7 +38,7 @@ def datetime_from_str(string, unavailable_ok=False): """Parse datetime from string in ISO 8601 format.""" if string == 'N/A' and unavailable_ok: return None - return datetime.fromisoformat(string) + return datetime.fromisoformat(string).astimezone(timezone.utc) def str_from_base64(string64): @@ -48,8 +49,9 @@ def str_from_base64(string64): def parse_summary(line): """Parse TLS certificate into a summary tuple.""" return tuple(map(call, - (partial(datetime_from_str, unavailable_ok=True), - datetime_from_str, str, int, int, str_from_base64), + (datetime_from_str, + partial(datetime_from_str, unavailable_ok=True), + str, int, int, str_from_base64), line.rstrip('\r\n').split(' ', maxsplit=5))) @@ -92,12 +94,12 @@ def describe_status(writer, status, http_version='1.1'): def body(not_before, not_after, hostname, port, number, string): """Describe the given certificate in XHTML.""" - if not_before is None: + if not_after is None: return (('h1', 'TLS certificate problem'), ('dl', ('dt', 'Domain'), ('dd', hostname), ('dt', 'Port'), ('dd', port), - ('dt', 'Time'), ('dd', not_after), + ('dt', 'Time'), ('dd', not_before), ('dt', 'Error'), ('dd', string))) return (('h1', 'TLS certificate information'), ('dl', @@ -114,10 +116,9 @@ def entry(base_url, cert): not_before, not_after, hostname, port, number, string = cert url = urljoin(base_url, path(hostname, port, number, string)) title = (f'TLS cert for {hostname} cannot be retrieved' - if not_before is None + if not_after is None else f'TLS cert for {hostname} will expire at {not_after}') - author = 'Scadere' if not_before is None else string - updated = not_after if not_before is None else not_before + author = 'Scadere' if not_after is None else string return ('entry', ('author', ('name', author)), ('content', {'type': 'xhtml'}, @@ -127,7 +128,7 @@ def entry(base_url, cert): 'type': 'application/xhtml+xml', 'href': url}), ('title', title), - ('updated', updated)) + ('updated', not_before)) def split_domain(domain): @@ -144,7 +145,7 @@ def is_subdomain(subject, objects): for obj_parts in map(split_domain, objects)) -def feed(base_url, name, mtime, certificates, domains): +def feed(mtime, base_url, name, certificates, domains): """Construct an Atom feed based on the given information.""" return ('feed', {'xmlns': 'http://www.w3.org/2005/Atom'}, ('id', base_url), @@ -159,9 +160,8 @@ def feed(base_url, name, mtime, certificates, domains): if is_subdomain(cert[2], domains))) -def page(certificate): +def page(not_before, not_after, hostname, port, *args): """Construct an XHTML page for the given TLS certificate.""" - hostname, port = certificate[2:4] return ('html', {'xmlns': 'http://www.w3.org/1999/xhtml', 'lang': 'en'}, ('head', @@ -172,7 +172,7 @@ def page(certificate): 'initial-scale=1.0')}), ('link', {'rel': 'icon', 'href': 'data:,'}), ('title', f'TLS certificate - {hostname}:{port}')), - ('body', *body(*certificate))) + ('body', *body(not_before, not_after, hostname, port, *args))) def xml(tree, parent=None): @@ -215,17 +215,26 @@ def unparsed_page(*args): xml_declaration=True, default_namespace=None) -def write_xml(writer, http_version, application, func, *args): +@lru_cache +def set_http_time_locale(): + """Set LC_TIME=C exactly once.""" + setlocale(LC_TIME, 'C') + + +def write_xml(writer, http_version, application, func, mtime, *args): """Write given document as XML.""" try: - content = func(*args).encode() + content = func(mtime, *args).encode() except Exception: # pragma: no cover describe_status(writer, HTTPStatus.INTERNAL_SERVER_ERROR, http_version) raise else: write_status(writer, http_version, HTTPStatus.OK) write_content_type(writer, f'application/{application}+xml') - writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode()) + writer.write(f'Content-Length: {len(content)}\r\n'.encode()) + set_http_time_locale() + http_time = mtime.strftime('%a, %d %b %Y %H:%M:%S GMT') + writer.write(f'Last-Modified: {http_time}\r\n\r\n'.encode()) writer.write(content) @@ -269,10 +278,10 @@ async def handle(certs, base_url, reader, writer, title=''): if url_parts.path == urlsplit(base_url).path: # Atom feed write_xml(writer, http_version, 'atom', unparsed_feed, - base_url, title or certs.name, mtime, summaries, domains) + mtime, base_url, title or certs.name, summaries, domains) elif url_parts.path in lookup: # accessible Atom entry's link/ID write_xml(writer, http_version, 'xhtml', unparsed_page, - lookup.get(url_parts.path)) + *lookup.get(url_parts.path)) else: describe_status(writer, HTTPStatus.NOT_FOUND, http_version) finally: diff --git a/tst/test_check.py b/tst/test_check.py index b07259f..aeb4a9e 100644 --- a/tst/test_check.py +++ b/tst/test_check.py @@ -60,13 +60,13 @@ async def test_check(domain, ca_name, not_after, after, trust_ca): summary = await get_cert_summary((domain, port), after, ca if trust_ca else None) if not trust_ca: - assert summary[0] is None + assert summary[1] is None assert 'self-signed certificate' in summary[5] elif not_after == SECONDS_AGO: - assert summary[0] is None + assert summary[1] is None assert 'certificate has expired' in summary[5] elif not printable(ca_name): - assert summary[0] is None + assert summary[1] is None assert 'control character' in summary[5] elif not_after > after: assert summary is None diff --git a/tst/test_listen.py b/tst/test_listen.py index bab9072..61929c8 100644 --- a/tst/test_listen.py +++ b/tst/test_listen.py @@ -6,7 +6,7 @@ from asyncio import open_unix_connection, start_unix_server from contextlib import asynccontextmanager, contextmanager from copy import deepcopy -from datetime import datetime +from datetime import datetime, timezone from email.parser import BytesHeaderParser from functools import partial from http import HTTPMethod @@ -19,7 +19,7 @@ from xml.etree.ElementTree import (XML, XMLParser, indent, from hypothesis import HealthCheck, given, settings from hypothesis.strategies import (booleans, composite, data, - datetimes, from_type, integers, + datetimes, from_type, integers, just, sampled_from, sets, text, uuids) from hypothesis.provisional import domains, urls from pytest import raises @@ -30,6 +30,7 @@ from scadere.listen import (handle, is_subdomain, path, parse_summary, ATOM_NAMESPACES = {'': 'http://www.w3.org/2005/Atom'} XHTML_NAMESPACES = {'': 'http://www.w3.org/1999/xhtml'} +UTC_DATETIMES = datetimes(timezones=just(timezone.utc)) def ports(): @@ -84,8 +85,8 @@ def test_xml_unsupported_type(tag, child): def certificates(draw): """Return a Hypothesis strategy for certificate summaries.""" valid = draw(booleans()) - not_before = draw(datetimes()).isoformat() if valid else 'N/A' - not_after = draw(datetimes()).isoformat() + not_before = draw(UTC_DATETIMES).isoformat() + not_after = draw(UTC_DATETIMES).isoformat() if valid else 'N/A' hostname = draw(domains()) port = draw(ports()) number = draw(serials()) if valid else draw(uuids(version=4)).int @@ -210,7 +211,8 @@ def unique_netlocs(summaries): @given(urls().filter(is_base_url).filter(has_usual_path), sets(certificates(), min_size=1).map(unique_netlocs), text().filter(printable)) -@settings(suppress_health_check=[HealthCheck.too_slow]) +@settings(suppress_health_check=(HealthCheck.filter_too_much, + HealthCheck.too_slow)) async def test_content(base_url, certs, title): base_path = urlsplit(base_url).path with tmp_cert_file(certs) as cert_file: |