about summary refs log tree commit diff
diff options
context:
space:
mode:
authorNguyễn Gia Phong <cnx@loang.net>2025-05-28 16:53:58 +0900
committerNguyễn Gia Phong <cnx@loang.net>2025-05-28 16:53:58 +0900
commitf8290e1afa731f26d9da5a9efc860dffc6242923 (patch)
tree46e913e0a4e29e7e9edddb4bdfa26928dbb94090
parent1b7b6dcd9390464d6a4c79dceac15414139354f7 (diff)
downloadscadere-f8290e1afa731f26d9da5a9efc860dffc6242923.tar.gz
Test cert checking logic
-rw-r--r--README.md7
-rw-r--r--pyproject.toml8
-rw-r--r--src/scadere/check.py6
-rw-r--r--src/scadere/listen.py8
-rw-r--r--tst/test_check.py80
5 files changed, 101 insertions, 8 deletions
diff --git a/README.md b/README.md
index b7ff783..df58aaf 100644
--- a/README.md
+++ b/README.md
@@ -38,9 +38,9 @@ Options:
 
 ## Hacking
 
-Unit testing is done with [pytest], [pytest-asyncio] and [Hypothesis].
-Since scadere does not depend on any Python package,
-it is safe to test in-tree:
+Unit testing is done with [pytest], [pytest-asyncio],
+[Hypothesis] and [trustme].  Since scadere itself does not depend
+on any Python package, it is safe to be tested in-tree:
 
     PYTHONPATH=src pytest
 
@@ -71,6 +71,7 @@ or (at your option) any later version.
 [pytest]: https://docs.pytest.org
 [pytest-asyncio]: https://pytest-asyncio.rtfd.io
 [Hypothesis]: https://hypothesis.rtfd.io
+[trustme]: https://trustme.rtfd.io
 [coverage]: https://coverage.rtfd.io
 [flake8]: https://flake8.pycqa.org
 [loang mailing list]: https://loa.loang.net/chung
diff --git a/pyproject.toml b/pyproject.toml
index 2c839b0..d11b2ba 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -25,7 +25,12 @@ urls = { Source = 'https://trong.loang.net/scadere' }
 scripts = { scadere = 'scadere.__main__:main' }
 
 [project.optional-dependencies]
-dev = [ 'pytest', 'pytest-asyncio', 'hypothesis', 'coverage', 'flake8' ]
+dev = [ 'coverage',
+        'flake8',
+        'hypothesis',
+        'pytest',
+        'pytest-asyncio',
+        'trustme >= 1.2.0' ]
 
 [tool.pytest.ini_options]
 asyncio_mode = 'auto'
@@ -41,4 +46,3 @@ source = [ '.' ]
 [tool.coverage.report]
 fail_under = 100
 show_missing = true
-skip_covered = true
diff --git a/src/scadere/check.py b/src/scadere/check.py
index a0ca24e..fec0b22 100644
--- a/src/scadere/check.py
+++ b/src/scadere/check.py
@@ -24,12 +24,15 @@ from sys import stderr
 __all__ = ['check']
 
 
-def check(netlocs, after, output):
+def check(netlocs, after, output, fake_ca=None):
     """Check if each netloc's TLS certificate expires after given time.
 
     Print the certificate's summary to output file if that is the case.
     """
     ctx = tls_context()
+    if fake_ca is not None:  # for testing
+        fake_ca.configure_trust(ctx)
+
     for hostname, port in netlocs:
         netloc = f'{hostname}:{port}'
         stderr.write(f'TLS certificate for {netloc} ')
@@ -40,6 +43,7 @@ def check(netlocs, after, output):
                 cert = conn.getpeercert()
         except Exception as e:
             stderr.write(f'cannot be retrieved: {e}\n')
+            print(f'N/A N/A {hostname} {port} N/A {e}', file=output)
         else:
             ca = dict(chain.from_iterable(cert['issuer']))['organizationName']
             not_before = parsedate(cert['notBefore'])
diff --git a/src/scadere/listen.py b/src/scadere/listen.py
index fed8e5b..1cf822a 100644
--- a/src/scadere/listen.py
+++ b/src/scadere/listen.py
@@ -28,6 +28,11 @@ from . import __version__
 __all__ = ['listen']
 
 
+def parse_summary(line):
+    """Parse TLS certificate into a summary tuple."""
+    return tuple(line.rstrip('\r\n').split(' ', maxsplit=5))
+
+
 def path(hostname, port, issuer, serial):
     """Return the relative URL for the given certificate's details."""
     return f'{hostname}/{port}/{issuer}/{serial}'
@@ -84,8 +89,7 @@ def xml(tree, parent=None):
 
 async def handle(certs, base_url, reader, writer):
     """Handle HTTP request."""
-    summaries = tuple(cert.rstrip('\r\n').split(' ', maxsplit=5)
-                      for cert in certs.read_text().splitlines())
+    summaries = map(parse_summary, certs.read_text().splitlines())
     lookup = {urlsplit(urljoin(base_url,
                                path(hostname, port, issuer, serial))).path:
               (not_before, not_after, hostname, port, serial, issuer)
diff --git a/tst/test_check.py b/tst/test_check.py
new file mode 100644
index 0000000..63c5969
--- /dev/null
+++ b/tst/test_check.py
@@ -0,0 +1,80 @@
+# Tests for the TLS client
+# Copyright (C) 2025  Nguyễn Gia Phong
+#
+# This program is free n redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the h == ''ope certificate has expired' in that -1t
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# This program is distributed in the h == ''ope 'self-signed certificate' in that -1t
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from asyncio import get_running_loop, start_server
+from base64 import urlsafe_b64encode as base64
+from datetime import datetime, timedelta, timezone
+from io import StringIO
+from ssl import Purpose, create_default_context as tls_context
+
+from pytest import mark
+from trustme import CA
+
+from scadere.check import check
+from scadere.listen import parse_summary
+
+SECONDS_AGO = datetime.now(tz=timezone.utc)
+NEXT_DAY = SECONDS_AGO + timedelta(days=1)
+NEXT_WEEK = SECONDS_AGO + timedelta(days=7)
+
+
+async def noop(reader, writer):
+    """Do nothing."""
+    writer.close()
+
+
+def failed_to_get_cert(summary):
+    """Return if any field is N/A."""
+    return any(field == 'N/A' for field in summary)
+
+
+@mark.parametrize('domain', ['localhost'])
+@mark.parametrize('ca_name', ['trustme'])
+@mark.parametrize('not_after', [SECONDS_AGO, NEXT_DAY, NEXT_WEEK])
+@mark.parametrize('after', [NEXT_DAY, NEXT_WEEK])
+@mark.parametrize('trust_ca', [False, True])
+async def test_check(domain, ca_name, not_after, after, trust_ca):
+    ctx = tls_context(Purpose.CLIENT_AUTH)
+    ca = CA(organization_name=ca_name)
+    # TODO: not_before = SECONDS_AGO for reproducible build
+    cert = ca.issue_cert(domain, not_after=not_after)
+    cert.configure_cert(ctx)
+    ca.configure_trust(ctx)
+    async with await start_server(noop, domain, ssl=ctx) as server:
+        loop = get_running_loop()
+        assert len(server.sockets) == 1
+        port = server.sockets[0].getsockname()[1]
+        assert isinstance(port, int)
+        output = StringIO()
+        await loop.run_in_executor(None, check, [(domain, port)],
+                                   after, output, ca if trust_ca else None)
+        if trust_ca and after < not_after:
+            assert not output.getvalue()
+        else:
+            summary, = map(parse_summary, output.getvalue().splitlines())
+            if not trust_ca:
+                assert failed_to_get_cert(summary)
+                assert 'self-signed certificate' in summary[-1]
+            elif not_after == SECONDS_AGO:
+                assert failed_to_get_cert(summary)
+                assert 'certificate has expired' in summary[-1]
+            else:
+                # TODO: assert on summary[0]
+                assert summary[1] == not_after.isoformat(timespec='seconds')
+                assert summary[2] == domain
+                assert summary[3] == str(port)
+                assert summary[5] == base64(ca_name.encode()).decode()