about summary refs log tree commit diff
diff options
context:
space:
mode:
authorNguyễn Gia Phong <cnx@loang.net>2025-06-20 16:43:30 +0900
committerNguyễn Gia Phong <cnx@loang.net>2025-06-20 17:54:35 +0900
commitb102f8e660c40f074419a03c7b0615b5cbead0bb (patch)
treefaddefbfc4b806f59a66f1ae22ceb9f16aecee3f
parent01b0e90c621606071e20534c93f5299cbfcacda3 (diff)
downloadscadere-b102f8e660c40f074419a03c7b0615b5cbead0bb.tar.gz
Add Last-Modified header to HTTP responses
-rw-r--r--src/scadere/check.py4
-rw-r--r--src/scadere/listen.py47
-rw-r--r--tst/test_check.py6
-rw-r--r--tst/test_listen.py12
4 files changed, 40 insertions, 29 deletions
diff --git a/src/scadere/check.py b/src/scadere/check.py
index 8c3e60b..97b98b2 100644
--- a/src/scadere/check.py
+++ b/src/scadere/check.py
@@ -55,7 +55,7 @@ def check(netlocs, after, output, fake_ca=None):
                 cert = conn.getpeercert()
         except Exception as exception:
             stderr.write(f'cannot be retrieved: {exception}\n')
-            print('N/A', now, hostname, port, uuid4().int,
+            print(now, 'N/A', hostname, port, uuid4().int,
                   base64_from_str(str(exception)), file=output)
             continue
 
@@ -68,7 +68,7 @@ def check(netlocs, after, output, fake_ca=None):
             serial = int(cert['serialNumber'], 16)
         except Exception as exception:
             stderr.write(f'cannot be parsed: {exception}\n')
-            print('N/A', now, hostname, port, uuid4().int,
+            print(now, 'N/A', hostname, port, uuid4().int,
                   base64_from_str(str(exception)), file=output)
         else:
             if after < not_after:
diff --git a/src/scadere/listen.py b/src/scadere/listen.py
index cc08cd1..151a108 100644
--- a/src/scadere/listen.py
+++ b/src/scadere/listen.py
@@ -9,16 +9,17 @@ from base64 import urlsafe_b64decode as from_base64
 from datetime import datetime, timezone
 from functools import lru_cache, partial
 from http import HTTPStatus
+from locale import LC_TIME, setlocale
 from operator import call
 from os.path import basename
 from pathlib import Path
 from string import digits
+from sys import argv
 from typing import assert_never
 from urllib.parse import parse_qs, urljoin, urlsplit
 from xml.etree.ElementTree import (Element as xml_element,
                                    SubElement as xml_subelement,
                                    indent, tostring as string_from_xml)
-from sys import argv
 
 from . import (__version__, GNUHelpFormatter, NetLoc,
                format_epilog, format_version)
@@ -37,7 +38,7 @@ def datetime_from_str(string, unavailable_ok=False):
     """Parse datetime from string in ISO 8601 format."""
     if string == 'N/A' and unavailable_ok:
         return None
-    return datetime.fromisoformat(string)
+    return datetime.fromisoformat(string).astimezone(timezone.utc)
 
 
 def str_from_base64(string64):
@@ -48,8 +49,9 @@ def str_from_base64(string64):
 def parse_summary(line):
     """Parse TLS certificate into a summary tuple."""
     return tuple(map(call,
-                     (partial(datetime_from_str, unavailable_ok=True),
-                      datetime_from_str, str, int, int, str_from_base64),
+                     (datetime_from_str,
+                      partial(datetime_from_str, unavailable_ok=True),
+                      str, int, int, str_from_base64),
                      line.rstrip('\r\n').split(' ', maxsplit=5)))
 
 
@@ -92,12 +94,12 @@ def describe_status(writer, status, http_version='1.1'):
 
 def body(not_before, not_after, hostname, port, number, string):
     """Describe the given certificate in XHTML."""
-    if not_before is None:
+    if not_after is None:
         return (('h1', 'TLS certificate problem'),
                 ('dl',
                  ('dt', 'Domain'), ('dd', hostname),
                  ('dt', 'Port'), ('dd', port),
-                 ('dt', 'Time'), ('dd', not_after),
+                 ('dt', 'Time'), ('dd', not_before),
                  ('dt', 'Error'), ('dd', string)))
     return (('h1', 'TLS certificate information'),
             ('dl',
@@ -114,10 +116,9 @@ def entry(base_url, cert):
     not_before, not_after, hostname, port, number, string = cert
     url = urljoin(base_url, path(hostname, port, number, string))
     title = (f'TLS cert for {hostname} cannot be retrieved'
-             if not_before is None
+             if not_after is None
              else f'TLS cert for {hostname} will expire at {not_after}')
-    author = 'Scadere' if not_before is None else string
-    updated = not_after if not_before is None else not_before
+    author = 'Scadere' if not_after is None else string
     return ('entry',
             ('author', ('name', author)),
             ('content', {'type': 'xhtml'},
@@ -127,7 +128,7 @@ def entry(base_url, cert):
                       'type': 'application/xhtml+xml',
                       'href': url}),
             ('title', title),
-            ('updated', updated))
+            ('updated', not_before))
 
 
 def split_domain(domain):
@@ -144,7 +145,7 @@ def is_subdomain(subject, objects):
                for obj_parts in map(split_domain, objects))
 
 
-def feed(base_url, name, mtime, certificates, domains):
+def feed(mtime, base_url, name, certificates, domains):
     """Construct an Atom feed based on the given information."""
     return ('feed', {'xmlns': 'http://www.w3.org/2005/Atom'},
             ('id', base_url),
@@ -159,9 +160,8 @@ def feed(base_url, name, mtime, certificates, domains):
               if is_subdomain(cert[2], domains)))
 
 
-def page(certificate):
+def page(not_before, not_after, hostname, port, *args):
     """Construct an XHTML page for the given TLS certificate."""
-    hostname, port = certificate[2:4]
     return ('html', {'xmlns': 'http://www.w3.org/1999/xhtml',
                      'lang': 'en'},
             ('head',
@@ -172,7 +172,7 @@ def page(certificate):
                                    'initial-scale=1.0')}),
              ('link', {'rel': 'icon', 'href': 'data:,'}),
              ('title', f'TLS certificate - {hostname}:{port}')),
-            ('body', *body(*certificate)))
+            ('body', *body(not_before, not_after, hostname, port, *args)))
 
 
 def xml(tree, parent=None):
@@ -215,17 +215,26 @@ def unparsed_page(*args):
                            xml_declaration=True, default_namespace=None)
 
 
-def write_xml(writer, http_version, application, func, *args):
+@lru_cache
+def set_http_time_locale():
+    """Set LC_TIME=C exactly once."""
+    setlocale(LC_TIME, 'C')
+
+
+def write_xml(writer, http_version, application, func, mtime, *args):
     """Write given document as XML."""
     try:
-        content = func(*args).encode()
+        content = func(mtime, *args).encode()
     except Exception:  # pragma: no cover
         describe_status(writer, HTTPStatus.INTERNAL_SERVER_ERROR, http_version)
         raise
     else:
         write_status(writer, http_version, HTTPStatus.OK)
         write_content_type(writer, f'application/{application}+xml')
-        writer.write(f'Content-Length: {len(content)}\r\n\r\n'.encode())
+        writer.write(f'Content-Length: {len(content)}\r\n'.encode())
+        set_http_time_locale()
+        http_time = mtime.strftime('%a, %d %b %Y %H:%M:%S GMT')
+        writer.write(f'Last-Modified: {http_time}\r\n\r\n'.encode())
         writer.write(content)
 
 
@@ -269,10 +278,10 @@ async def handle(certs, base_url, reader, writer, title=''):
 
         if url_parts.path == urlsplit(base_url).path:  # Atom feed
             write_xml(writer, http_version, 'atom', unparsed_feed,
-                      base_url, title or certs.name, mtime, summaries, domains)
+                      mtime, base_url, title or certs.name, summaries, domains)
         elif url_parts.path in lookup:  # accessible Atom entry's link/ID
             write_xml(writer, http_version, 'xhtml', unparsed_page,
-                      lookup.get(url_parts.path))
+                      *lookup.get(url_parts.path))
         else:
             describe_status(writer, HTTPStatus.NOT_FOUND, http_version)
     finally:
diff --git a/tst/test_check.py b/tst/test_check.py
index b07259f..aeb4a9e 100644
--- a/tst/test_check.py
+++ b/tst/test_check.py
@@ -60,13 +60,13 @@ async def test_check(domain, ca_name, not_after, after, trust_ca):
         summary = await get_cert_summary((domain, port), after,
                                          ca if trust_ca else None)
         if not trust_ca:
-            assert summary[0] is None
+            assert summary[1] is None
             assert 'self-signed certificate' in summary[5]
         elif not_after == SECONDS_AGO:
-            assert summary[0] is None
+            assert summary[1] is None
             assert 'certificate has expired' in summary[5]
         elif not printable(ca_name):
-            assert summary[0] is None
+            assert summary[1] is None
             assert 'control character' in summary[5]
         elif not_after > after:
             assert summary is None
diff --git a/tst/test_listen.py b/tst/test_listen.py
index bab9072..61929c8 100644
--- a/tst/test_listen.py
+++ b/tst/test_listen.py
@@ -6,7 +6,7 @@
 from asyncio import open_unix_connection, start_unix_server
 from contextlib import asynccontextmanager, contextmanager
 from copy import deepcopy
-from datetime import datetime
+from datetime import datetime, timezone
 from email.parser import BytesHeaderParser
 from functools import partial
 from http import HTTPMethod
@@ -19,7 +19,7 @@ from xml.etree.ElementTree import (XML, XMLParser, indent,
 
 from hypothesis import HealthCheck, given, settings
 from hypothesis.strategies import (booleans, composite, data,
-                                   datetimes, from_type, integers,
+                                   datetimes, from_type, integers, just,
                                    sampled_from, sets, text, uuids)
 from hypothesis.provisional import domains, urls
 from pytest import raises
@@ -30,6 +30,7 @@ from scadere.listen import (handle, is_subdomain, path, parse_summary,
 
 ATOM_NAMESPACES = {'': 'http://www.w3.org/2005/Atom'}
 XHTML_NAMESPACES = {'': 'http://www.w3.org/1999/xhtml'}
+UTC_DATETIMES = datetimes(timezones=just(timezone.utc))
 
 
 def ports():
@@ -84,8 +85,8 @@ def test_xml_unsupported_type(tag, child):
 def certificates(draw):
     """Return a Hypothesis strategy for certificate summaries."""
     valid = draw(booleans())
-    not_before = draw(datetimes()).isoformat() if valid else 'N/A'
-    not_after = draw(datetimes()).isoformat()
+    not_before = draw(UTC_DATETIMES).isoformat()
+    not_after = draw(UTC_DATETIMES).isoformat() if valid else 'N/A'
     hostname = draw(domains())
     port = draw(ports())
     number = draw(serials()) if valid else draw(uuids(version=4)).int
@@ -210,7 +211,8 @@ def unique_netlocs(summaries):
 @given(urls().filter(is_base_url).filter(has_usual_path),
        sets(certificates(), min_size=1).map(unique_netlocs),
        text().filter(printable))
-@settings(suppress_health_check=[HealthCheck.too_slow])
+@settings(suppress_health_check=(HealthCheck.filter_too_much,
+                                 HealthCheck.too_slow))
 async def test_content(base_url, certs, title):
     base_path = urlsplit(base_url).path
     with tmp_cert_file(certs) as cert_file: