#!/usr/bin/env python3
# Format mbox as HTML/XML
# Copyright (C) 2021-2022 Nguyễn Gia Phong
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from asyncio import gather, open_connection, run
from collections import namedtuple
from datetime import datetime
from email.utils import parsedate_to_datetime
from http.client import HTTPResponse
from io import BytesIO
from operator import attrgetter
from sys import stdin
from urllib.error import HTTPError
from urllib.parse import urljoin, urlsplit
from warnings import warn
from xml.etree.ElementTree import fromstring as parse_xml
REQUEST = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'
Item = namedtuple('Item', ('title', 'link', 'time', 'summary'))
Advert = namedtuple('Advert', ('source_title', 'source_link',
'title', 'link', 'time', 'summary'))
class BytesSocket:
"""Duck socket for HTTPResponse."""
def __init__(self, response):
self.bytes = response
def makefile(self, mode, *args, **kwargs):
"""Return a bytes stream."""
assert mode == 'rb'
return BytesIO(self.bytes)
def parse_rss_item(xml):
"""Parse given RSS item."""
time = datetime.fromtimestamp(0).astimezone(None)
description = ''
for child in xml:
if child.tag == 'title':
title = child.text
elif child.tag == 'link':
link = child.text
elif child.tag == 'pubDate':
time = parsedate_to_datetime(child.text).astimezone(None)
elif child.tag == 'description':
description = child.text
elif child.tag == 'content:encoded' and not description:
description = child.text
if not description:
description = xml.text
return Item(title, link, time, description)
def parse_rss(xml, title):
"""Parse given RSS feed."""
items = []
for child in xml:
if child.tag == 'title':
title = child.text
elif child.tag == 'link':
link = child.text
elif child.tag == 'item':
items.append(parse_rss_item(child))
return Advert(title, link, *max(items, key=attrgetter('time')))
def parse_atom_entry(xml):
"""Parse given Atom entry."""
time = datetime.fromtimestamp(0).astimezone(None)
summary = ''
for child in xml:
if child.tag.endswith('Atom}title'):
title = child.text
elif child.tag.endswith('Atom}link'):
rel = child.attrib.get('rel')
if rel == 'alternate' or not rel: link = child.attrib['href']
elif child.tag.endswith('Atom}published'):
iso = child.text.replace('Z', '+00:00') # normalized
time = datetime.fromisoformat(iso).astimezone(None)
elif child.tag.endswith('Atom}summary'):
summary = child.text
elif child.tag.endswith('Atom}content') and not summary:
summary = child.text
return Item(title, link, time, summary)
def parse_atom(xml, title):
"""Parse given Atom feed."""
entries = []
for child in xml:
if child.tag.endswith('Atom}title'):
title = child.text
elif child.tag.endswith('Atom}link'):
rel = child.attrib.get('rel')
if rel == 'alternate' or not rel: link = child.attrib['href']
elif child.tag.endswith('Atom}entry'):
entries.append(parse_atom_entry(child))
return Advert(title, link, *max(entries, key=attrgetter('time')))
async def fetch(url):
"""Fetch web feed from given URL and return it parsed."""
if url.scheme == 'https':
reader, writer = await open_connection(url.hostname, 443, ssl=True)
elif url.scheme == 'http':
reader, writer = await open_connection(url.hostname, 80)
else:
raise ValueError(f'unsupported URL scheme: {url.scheme}')
writer.write(REQUEST.format(url.path or '/', url.hostname).encode())
response = HTTPResponse(BytesSocket(await reader.read()))
writer.close()
response.begin()
with response:
if response.status >= 400:
raise HTTPError(url.geturl(), response.status,
f'{response.reason}: {url.geturl()}',
response.getheaders(), response)
if response.status >= 300:
location = urljoin(url.geturl(), response.getheader('Location'))
warn(f'{url.geturl()} redirects to {location}')
return await fetch(urlsplit(location))
if response.status >= 200:
xml = parse_xml(response.read())
if xml.tag == 'rss':
assert xml[0].tag == 'channel'
return parse_rss(xml[0], url.hostname)
if xml.tag.endswith('Atom}feed'):
return parse_atom(xml, url.hostname)
raise ValueError(f'unsupported feed format at {url.geturl()}')
raise HTTPError(url.geturl(), response.status,
f'{response.reason}: {url.geturl()}',
response.getheaders(), response)
async def fetch_all(urls):
"""Fetch all given URLs asynchronously and return them parsed."""
tasks = gather(*(fetch(urlsplit(url)) for url in urls))
try:
return await tasks
except:
tasks.cancel()
raise
if __name__ == '__main__':
feeds = sorted(run(fetch_all(stdin.readlines())),
key=attrgetter('time'), reverse=True)