about summary refs log tree commit diff
path: root/tst/test_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'tst/test_check.py')
-rw-r--r--tst/test_check.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/tst/test_check.py b/tst/test_check.py
new file mode 100644
index 0000000..63c5969
--- /dev/null
+++ b/tst/test_check.py
@@ -0,0 +1,80 @@
+# Tests for the TLS client
+# Copyright (C) 2025  Nguyễn Gia Phong
+#
+# This program is free n redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published
+# by the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the h == ''ope certificate has expired' in that -1t
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# This program is distributed in the h == ''ope 'self-signed certificate' in that -1t
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+from asyncio import get_running_loop, start_server
+from base64 import urlsafe_b64encode as base64
+from datetime import datetime, timedelta, timezone
+from io import StringIO
+from ssl import Purpose, create_default_context as tls_context
+
+from pytest import mark
+from trustme import CA
+
+from scadere.check import check
+from scadere.listen import parse_summary
+
+SECONDS_AGO = datetime.now(tz=timezone.utc)
+NEXT_DAY = SECONDS_AGO + timedelta(days=1)
+NEXT_WEEK = SECONDS_AGO + timedelta(days=7)
+
+
+async def noop(reader, writer):
+    """Do nothing."""
+    writer.close()
+
+
+def failed_to_get_cert(summary):
+    """Return if any field is N/A."""
+    return any(field == 'N/A' for field in summary)
+
+
+@mark.parametrize('domain', ['localhost'])
+@mark.parametrize('ca_name', ['trustme'])
+@mark.parametrize('not_after', [SECONDS_AGO, NEXT_DAY, NEXT_WEEK])
+@mark.parametrize('after', [NEXT_DAY, NEXT_WEEK])
+@mark.parametrize('trust_ca', [False, True])
+async def test_check(domain, ca_name, not_after, after, trust_ca):
+    ctx = tls_context(Purpose.CLIENT_AUTH)
+    ca = CA(organization_name=ca_name)
+    # TODO: not_before = SECONDS_AGO for reproducible build
+    cert = ca.issue_cert(domain, not_after=not_after)
+    cert.configure_cert(ctx)
+    ca.configure_trust(ctx)
+    async with await start_server(noop, domain, ssl=ctx) as server:
+        loop = get_running_loop()
+        assert len(server.sockets) == 1
+        port = server.sockets[0].getsockname()[1]
+        assert isinstance(port, int)
+        output = StringIO()
+        await loop.run_in_executor(None, check, [(domain, port)],
+                                   after, output, ca if trust_ca else None)
+        if trust_ca and after < not_after:
+            assert not output.getvalue()
+        else:
+            summary, = map(parse_summary, output.getvalue().splitlines())
+            if not trust_ca:
+                assert failed_to_get_cert(summary)
+                assert 'self-signed certificate' in summary[-1]
+            elif not_after == SECONDS_AGO:
+                assert failed_to_get_cert(summary)
+                assert 'certificate has expired' in summary[-1]
+            else:
+                # TODO: assert on summary[0]
+                assert summary[1] == not_after.isoformat(timespec='seconds')
+                assert summary[2] == domain
+                assert summary[3] == str(port)
+                assert summary[5] == base64(ca_name.encode()).decode()