about summary refs log tree commit diff
path: root/tst/test_check.py
blob: 989020997f1efc38de4094b61c0f2bf5474a301b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Tests for the TLS client
# Copyright (C) 2025  Nguyễn Gia Phong
#
# This file is part of scadere.
#
# Scadere is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Scadere is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with scadere.  If not, see <https://www.gnu.org/licenses/>.

from asyncio import get_running_loop, start_server
from base64 import urlsafe_b64encode as base64
from datetime import datetime, timedelta, timezone
from io import StringIO
from ssl import Purpose, create_default_context as tls_context

from pytest import mark
from trustme import CA

from scadere.check import check
from scadere.listen import parse_summary

SECONDS_AGO = datetime.now(tz=timezone.utc)
NEXT_DAY = SECONDS_AGO + timedelta(days=1)
NEXT_WEEK = SECONDS_AGO + timedelta(days=7)


async def noop(reader, writer):
    """Do nothing."""
    writer.close()
    await writer.wait_closed()


def failed_to_get_cert(summary):
    """Return if any field is N/A."""
    return any(field == 'N/A' for field in summary)


@mark.parametrize('domain', ['localhost'])
@mark.parametrize('ca_name', ['trustme'])
@mark.parametrize('not_after', [SECONDS_AGO, NEXT_DAY, NEXT_WEEK])
@mark.parametrize('after', [NEXT_DAY, NEXT_WEEK])
@mark.parametrize('trust_ca', [False, True])
async def test_check(domain, ca_name, not_after, after, trust_ca):
    ctx = tls_context(Purpose.CLIENT_AUTH)
    ca = CA(organization_name=ca_name)
    # TODO: not_before = SECONDS_AGO for reproducible build
    cert = ca.issue_cert(domain, not_after=not_after)
    cert.configure_cert(ctx)
    ca.configure_trust(ctx)
    async with await start_server(noop, domain, ssl=ctx) as server:
        loop = get_running_loop()
        port = server.sockets[0].getsockname()[1]
        assert isinstance(port, int)
        output = StringIO()
        await loop.run_in_executor(None, check, [(domain, port)],
                                   after, output, ca if trust_ca else None)
        if trust_ca and after < not_after:
            assert not output.getvalue()
        else:
            summary, = map(parse_summary, output.getvalue().splitlines())
            if not trust_ca:
                assert failed_to_get_cert(summary)
                assert 'self-signed certificate' in summary[-1]
            elif not_after == SECONDS_AGO:
                assert failed_to_get_cert(summary)
                assert 'certificate has expired' in summary[-1]
            else:
                # TODO: assert on summary[0]
                assert summary[1] == not_after.isoformat(timespec='seconds')
                assert summary[2] == domain
                assert summary[3] == str(port)
                assert summary[5] == base64(ca_name.encode()).decode()