about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorNguyễn Gia Phong <cnx@loang.net>2025-06-04 01:30:57 +0900
committerNguyễn Gia Phong <cnx@loang.net>2025-06-04 01:30:57 +0900
commitcb1b5c48145f9c23eae95f922c6af804466bcb42 (patch)
tree24859b4e1642753e7756975cae28e4d24f9847a3 /src
parent8b83c1f04c808558a8097022466b2d4327dd62af (diff)
downloadscadere-cb1b5c48145f9c23eae95f922c6af804466bcb42.tar.gz
Handle summaries of certs not retrieved
Diffstat (limited to 'src')
-rw-r--r--src/scadere/check.py23
-rw-r--r--src/scadere/listen.py49
2 files changed, 56 insertions, 16 deletions
diff --git a/src/scadere/check.py b/src/scadere/check.py
index 23ba189..aaabe3f 100644
--- a/src/scadere/check.py
+++ b/src/scadere/check.py
@@ -24,15 +24,25 @@ from itertools import chain
 from socket import AF_INET, socket
 from ssl import create_default_context as tls_context
 from sys import argv, stderr, stdout
+from unicodedata import category as unicode_category
 
 from . import __version__, GNUHelpFormatter, NetLoc
 
 __all__ = ['main']
 
 
+class CtlChrTrans:
+    """Translator for printing Unicode control characters."""
+
+    def __getitem__(self, ordinal):
+        if unicode_category(chr(ordinal)) == 'Cc':
+            return 0xfffd  # replacement character '�'
+        raise KeyError
+
+
 def base64_from_str(string):
     """Convert string to base64 format in bytes."""
-    return base64(string.encode()).decode()
+    return base64(string.translate(CtlChrTrans()).encode()).decode()
 
 
 def check(netlocs, after, output, fake_ca=None):
@@ -52,9 +62,11 @@ def check(netlocs, after, output, fake_ca=None):
                                  server_hostname=hostname) as conn:
                 conn.connect((hostname, port))
                 cert = conn.getpeercert()
-        except Exception as e:
-            stderr.write(f'cannot be retrieved: {e}\n')
-            print(f'N/A N/A {hostname} {port} N/A {e}', file=output)
+        except Exception as exception:
+            stderr.write(f'cannot be retrieved: {exception}\n')
+            now = datetime.now(tz=timezone.utc).isoformat()
+            print(now, 'N/A', hostname, port, 'N/A',
+                  base64_from_str(str(exception)), file=output)
         else:
             ca = dict(chain.from_iterable(cert['issuer']))['organizationName']
             not_before = parsedate(cert['notBefore'])
@@ -64,9 +76,10 @@ def check(netlocs, after, output, fake_ca=None):
                 stderr.write(f'will not expire at {after_seconds}\n')
             else:
                 stderr.write(f'will expire at {not_after.isoformat()}\n')
+                serial = cert['serialNumber'].translate(CtlChrTrans())
                 print(not_before.isoformat(), not_after.isoformat(),
                       # As unique identifier
-                      hostname, port, cert['serialNumber'],
+                      hostname, port, serial,
                       base64_from_str(ca), file=output)
 
 
diff --git a/src/scadere/listen.py b/src/scadere/listen.py
index bf179e6..d8c1178 100644
--- a/src/scadere/listen.py
+++ b/src/scadere/listen.py
@@ -23,6 +23,7 @@ from datetime import datetime, timezone
 from functools import partial
 from http import HTTPStatus
 from pathlib import Path
+from typing import assert_never
 from urllib.parse import parse_qs, urljoin, urlsplit
 from xml.etree.ElementTree import (Element as xml_element,
                                    SubElement as xml_subelement,
@@ -41,9 +42,18 @@ def parse_summary(line):
 
 def path(hostname, port, issuer, serial):
     """Return the relative URL for the given certificate's details."""
+    if serial == 'N/A':
+        return f'{hostname}/{port}'
     return f'{hostname}/{port}/{issuer}/{serial}'
 
 
+def datetime_from_str(string, unavailable_ok=False):
+    """Parse datetime from string in ISO 8601 format."""
+    if string == 'N/A' and unavailable_ok:
+        return None
+    return datetime.fromisoformat(string)
+
+
 async def write_status(writer, status):
     """Write the given HTTP/1.1 status line."""
     writer.write(f'HTTP/1.1 {status.value} {status.phrase}\r\n'.encode())
@@ -71,13 +81,21 @@ def str_from_base64(string):
     return from_base64(string.encode()).decode()
 
 
-def body(not_before, not_after, hostname, port, serial, issuer):
+def body(not_before, not_after, hostname, port, serial, string64):
     """Describe the given certificate in XHTML."""
+    string = str_from_base64(string64)
+    if not_after is None:
+        return (('h1', 'TLS certificate problem'),
+                ('dl',
+                 ('dt', 'Domain'), ('dd', hostname),
+                 ('dt', 'Port'), ('dd', port),
+                 ('dt', 'Time'), ('dd', not_before),
+                 ('dt', 'Error'), ('dd', string)))
     return (('h1', 'TLS certificate information'),
             ('dl',
              ('dt', 'Domain'), ('dd', hostname),
              ('dt', 'Port'), ('dd', port),
-             ('dt', 'Issuer'), ('dd', str_from_base64(issuer)),
+             ('dt', 'Issuer'), ('dd', string),
              ('dt', 'Serial number'), ('dd', serial),
              ('dt', 'Valid from'), ('dd', not_before),
              ('dt', 'Valid until'), ('dd', not_after)))
@@ -87,15 +105,19 @@ def entry(base_url, cert):
     """Construct Atom entry for the given TLS certificate."""
     not_before, not_after, hostname, port, serial, issuer = cert
     url = urljoin(base_url, path(hostname, port, issuer, serial))
+    title = (f'TLS cert for {hostname} cannot be retrieved'
+             if not_after is None
+             else f'TLS cert for {hostname} will expire at {not_after}')
+    author = 'Scadere' if not_after is None else str_from_base64(issuer)
     return ('entry',
-            ('author', ('name', str_from_base64(issuer))),
+            ('author', ('name', author)),
             ('content', {'type': 'xhtml'},
              ('div', {'xmlns': 'http://www.w3.org/1999/xhtml'}, *body(*cert))),
             ('id', url),
             ('link', {'rel': 'alternate',
                       'type': 'application/xhtml+xml',
                       'href': url}),
-            ('title', (f'TLS cert for {hostname} will expire at {not_after}')),
+            ('title', title),
             ('updated', not_before))
 
 
@@ -109,12 +131,15 @@ def xml(tree, parent=None):
     else:
         elem = xml_subelement(parent, tag, attrs)
     for child in children:
-        if isinstance(child, tuple):
-            xml(child, elem)
-        elif isinstance(child, datetime):
-            elem.text = child.isoformat()
-        else:
-            elem.text = str(child)
+        match child:
+            case tuple():
+                xml(child, elem)
+            case str():
+                elem.text = child
+            case datetime():
+                elem.text = child.isoformat()
+            case _:  # pragma: no cover
+                assert_never(child)
     if parent is None:
         indent(elem)
     return elem
@@ -151,7 +176,9 @@ async def handle(certs, base_url, reader, writer):
     summaries = map(parse_summary, certs.read_text().splitlines())
     lookup = {urlsplit(urljoin(base_url,
                                path(hostname, port, issuer, serial))).path:
-              (not_before, not_after, hostname, port, serial, issuer)
+              (datetime_from_str(not_before),
+               datetime_from_str(not_after, unavailable_ok=True),
+               hostname, port, serial, issuer)
               for not_before, not_after, hostname, port, serial, issuer
               in summaries}
     request = await reader.readuntil(b'\r\n')