about summary refs log tree commit diff
path: root/tst/test_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'tst/test_check.py')
-rw-r--r--tst/test_check.py49
1 files changed, 30 insertions, 19 deletions
diff --git a/tst/test_check.py b/tst/test_check.py
index 9890209..a87788e 100644
--- a/tst/test_check.py
+++ b/tst/test_check.py
@@ -44,6 +44,21 @@ def failed_to_get_cert(summary):
     return any(field == 'N/A' for field in summary)
 
 
+async def get_cert_summary(netloc, after, ca):
+    """Fetch TLS certificate expiration summary for netloc."""
+    loop = get_running_loop()
+    while True:
+        output = StringIO()
+        await loop.run_in_executor(None, check, [netloc], after, output, ca)
+        if not output.getvalue():
+            return None
+        summary, = map(parse_summary, output.getvalue().splitlines())
+        if 'Connection refused' in summary[-1]:  # pragma: no cover
+            assert failed_to_get_cert(summary)
+            continue
+        return summary
+
+
 @mark.parametrize('domain', ['localhost'])
 @mark.parametrize('ca_name', ['trustme'])
 @mark.parametrize('not_after', [SECONDS_AGO, NEXT_DAY, NEXT_WEEK])
@@ -57,25 +72,21 @@ async def test_check(domain, ca_name, not_after, after, trust_ca):
     cert.configure_cert(ctx)
     ca.configure_trust(ctx)
     async with await start_server(noop, domain, ssl=ctx) as server:
-        loop = get_running_loop()
         port = server.sockets[0].getsockname()[1]
         assert isinstance(port, int)
-        output = StringIO()
-        await loop.run_in_executor(None, check, [(domain, port)],
-                                   after, output, ca if trust_ca else None)
-        if trust_ca and after < not_after:
-            assert not output.getvalue()
+        summary = await get_cert_summary((domain, port), after,
+                                         ca if trust_ca else None)
+        if not trust_ca:
+            assert failed_to_get_cert(summary)
+            assert 'self-signed certificate' in summary[-1]
+        elif not_after == SECONDS_AGO:
+            assert failed_to_get_cert(summary)
+            assert 'certificate has expired' in summary[-1]
+        elif not_after > after:
+            assert summary is None
         else:
-            summary, = map(parse_summary, output.getvalue().splitlines())
-            if not trust_ca:
-                assert failed_to_get_cert(summary)
-                assert 'self-signed certificate' in summary[-1]
-            elif not_after == SECONDS_AGO:
-                assert failed_to_get_cert(summary)
-                assert 'certificate has expired' in summary[-1]
-            else:
-                # TODO: assert on summary[0]
-                assert summary[1] == not_after.isoformat(timespec='seconds')
-                assert summary[2] == domain
-                assert summary[3] == str(port)
-                assert summary[5] == base64(ca_name.encode()).decode()
+            # TODO: assert on summary[0]
+            assert summary[1] == not_after.isoformat(timespec='seconds')
+            assert summary[2] == domain
+            assert summary[3] == str(port)
+            assert summary[5] == base64(ca_name.encode()).decode()