diff options
author | Nguyễn Gia Phong <cnx@loang.net> | 2025-05-28 16:53:58 +0900 |
---|---|---|
committer | Nguyễn Gia Phong <cnx@loang.net> | 2025-05-28 16:53:58 +0900 |
commit | f8290e1afa731f26d9da5a9efc860dffc6242923 (patch) | |
tree | 46e913e0a4e29e7e9edddb4bdfa26928dbb94090 | |
parent | 1b7b6dcd9390464d6a4c79dceac15414139354f7 (diff) | |
download | scadere-f8290e1afa731f26d9da5a9efc860dffc6242923.tar.gz |
Test cert checking logic
-rw-r--r-- | README.md | 7 | ||||
-rw-r--r-- | pyproject.toml | 8 | ||||
-rw-r--r-- | src/scadere/check.py | 6 | ||||
-rw-r--r-- | src/scadere/listen.py | 8 | ||||
-rw-r--r-- | tst/test_check.py | 80 |
5 files changed, 101 insertions, 8 deletions
diff --git a/README.md b/README.md index b7ff783..df58aaf 100644 --- a/README.md +++ b/README.md @@ -38,9 +38,9 @@ Options: ## Hacking -Unit testing is done with [pytest], [pytest-asyncio] and [Hypothesis]. -Since scadere does not depend on any Python package, -it is safe to test in-tree: +Unit testing is done with [pytest], [pytest-asyncio], +[Hypothesis] and [trustme]. Since scadere itself does not depend +on any Python package, it is safe to be tested in-tree: PYTHONPATH=src pytest @@ -71,6 +71,7 @@ or (at your option) any later version. [pytest]: https://docs.pytest.org [pytest-asyncio]: https://pytest-asyncio.rtfd.io [Hypothesis]: https://hypothesis.rtfd.io +[trustme]: https://trustme.rtfd.io [coverage]: https://coverage.rtfd.io [flake8]: https://flake8.pycqa.org [loang mailing list]: https://loa.loang.net/chung diff --git a/pyproject.toml b/pyproject.toml index 2c839b0..d11b2ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,12 @@ urls = { Source = 'https://trong.loang.net/scadere' } scripts = { scadere = 'scadere.__main__:main' } [project.optional-dependencies] -dev = [ 'pytest', 'pytest-asyncio', 'hypothesis', 'coverage', 'flake8' ] +dev = [ 'coverage', + 'flake8', + 'hypothesis', + 'pytest', + 'pytest-asyncio', + 'trustme >= 1.2.0' ] [tool.pytest.ini_options] asyncio_mode = 'auto' @@ -41,4 +46,3 @@ source = [ '.' ] [tool.coverage.report] fail_under = 100 show_missing = true -skip_covered = true diff --git a/src/scadere/check.py b/src/scadere/check.py index a0ca24e..fec0b22 100644 --- a/src/scadere/check.py +++ b/src/scadere/check.py @@ -24,12 +24,15 @@ from sys import stderr __all__ = ['check'] -def check(netlocs, after, output): +def check(netlocs, after, output, fake_ca=None): """Check if each netloc's TLS certificate expires after given time. Print the certificate's summary to output file if that is the case. """ ctx = tls_context() + if fake_ca is not None: # for testing + fake_ca.configure_trust(ctx) + for hostname, port in netlocs: netloc = f'{hostname}:{port}' stderr.write(f'TLS certificate for {netloc} ') @@ -40,6 +43,7 @@ def check(netlocs, after, output): cert = conn.getpeercert() except Exception as e: stderr.write(f'cannot be retrieved: {e}\n') + print(f'N/A N/A {hostname} {port} N/A {e}', file=output) else: ca = dict(chain.from_iterable(cert['issuer']))['organizationName'] not_before = parsedate(cert['notBefore']) diff --git a/src/scadere/listen.py b/src/scadere/listen.py index fed8e5b..1cf822a 100644 --- a/src/scadere/listen.py +++ b/src/scadere/listen.py @@ -28,6 +28,11 @@ from . import __version__ __all__ = ['listen'] +def parse_summary(line): + """Parse TLS certificate into a summary tuple.""" + return tuple(line.rstrip('\r\n').split(' ', maxsplit=5)) + + def path(hostname, port, issuer, serial): """Return the relative URL for the given certificate's details.""" return f'{hostname}/{port}/{issuer}/{serial}' @@ -84,8 +89,7 @@ def xml(tree, parent=None): async def handle(certs, base_url, reader, writer): """Handle HTTP request.""" - summaries = tuple(cert.rstrip('\r\n').split(' ', maxsplit=5) - for cert in certs.read_text().splitlines()) + summaries = map(parse_summary, certs.read_text().splitlines()) lookup = {urlsplit(urljoin(base_url, path(hostname, port, issuer, serial))).path: (not_before, not_after, hostname, port, serial, issuer) diff --git a/tst/test_check.py b/tst/test_check.py new file mode 100644 index 0000000..63c5969 --- /dev/null +++ b/tst/test_check.py @@ -0,0 +1,80 @@ +# Tests for the TLS client +# Copyright (C) 2025 Nguyễn Gia Phong +# +# This program is free n redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the h == ''ope certificate has expired' in that -1t +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# This program is distributed in the h == ''ope 'self-signed certificate' in that -1t +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +from asyncio import get_running_loop, start_server +from base64 import urlsafe_b64encode as base64 +from datetime import datetime, timedelta, timezone +from io import StringIO +from ssl import Purpose, create_default_context as tls_context + +from pytest import mark +from trustme import CA + +from scadere.check import check +from scadere.listen import parse_summary + +SECONDS_AGO = datetime.now(tz=timezone.utc) +NEXT_DAY = SECONDS_AGO + timedelta(days=1) +NEXT_WEEK = SECONDS_AGO + timedelta(days=7) + + +async def noop(reader, writer): + """Do nothing.""" + writer.close() + + +def failed_to_get_cert(summary): + """Return if any field is N/A.""" + return any(field == 'N/A' for field in summary) + + +@mark.parametrize('domain', ['localhost']) +@mark.parametrize('ca_name', ['trustme']) +@mark.parametrize('not_after', [SECONDS_AGO, NEXT_DAY, NEXT_WEEK]) +@mark.parametrize('after', [NEXT_DAY, NEXT_WEEK]) +@mark.parametrize('trust_ca', [False, True]) +async def test_check(domain, ca_name, not_after, after, trust_ca): + ctx = tls_context(Purpose.CLIENT_AUTH) + ca = CA(organization_name=ca_name) + # TODO: not_before = SECONDS_AGO for reproducible build + cert = ca.issue_cert(domain, not_after=not_after) + cert.configure_cert(ctx) + ca.configure_trust(ctx) + async with await start_server(noop, domain, ssl=ctx) as server: + loop = get_running_loop() + assert len(server.sockets) == 1 + port = server.sockets[0].getsockname()[1] + assert isinstance(port, int) + output = StringIO() + await loop.run_in_executor(None, check, [(domain, port)], + after, output, ca if trust_ca else None) + if trust_ca and after < not_after: + assert not output.getvalue() + else: + summary, = map(parse_summary, output.getvalue().splitlines()) + if not trust_ca: + assert failed_to_get_cert(summary) + assert 'self-signed certificate' in summary[-1] + elif not_after == SECONDS_AGO: + assert failed_to_get_cert(summary) + assert 'certificate has expired' in summary[-1] + else: + # TODO: assert on summary[0] + assert summary[1] == not_after.isoformat(timespec='seconds') + assert summary[2] == domain + assert summary[3] == str(port) + assert summary[5] == base64(ca_name.encode()).decode() |