#!/usr/bin/env python3 # Format mbox as HTML/XML # Copyright (C) 2021-2022 Nguyễn Gia Phong # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from asyncio import gather, open_connection, run from collections import namedtuple from datetime import datetime from email.utils import parsedate_to_datetime from http.client import HTTPResponse from io import BytesIO from sys import stdin from urllib.error import HTTPError from urllib.parse import urljoin, urlsplit from xml.etree.ElementTree import fromstring as parse_xml REQUEST = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n' Feed = namedtuple('Feed', ('title', 'items')) Item = namedtuple('Item', ('title', 'link', 'date', 'description')) class BytesSocket: """Duck socket for HTTPResponse.""" def __init__(self, response): self.bytes = response def makefile(self, mode, *args, **kwargs): """Return a bytes stream.""" assert mode == 'rb' return BytesIO(self.bytes) def parse_rss_item(xml): """Parse given RSS item.""" date = None description = '' for child in xml: if child.tag == 'title': title = child.text elif child.tag == 'link': link = child.text elif child.tag == 'pubDate': date = parsedate_to_datetime(child.text) elif child.tag == 'description': description = child.text elif child.tag == 'content:encoded' and not description: description = child.text if not description: description = xml.text return Item(title, link, date, description) def parse_rss(xml, url): """Parse given RSS feed.""" title = url items = [] for child in xml: if child.tag == 'title': title = child.text elif child.tag == 'item': items.append(parse_rss_item(child)) return Feed(title, items) def parse_atom_entry(xml): """Parse given Atom entry.""" date = None summary = '' for child in xml: if child.tag.endswith('Atom}title'): title = child.text elif child.tag.endswith('Atom}link'): link = child.attrib['href'] elif child.tag.endswith('Atom}published'): date = datetime.fromisoformat(child.text.replace('Z', '+00:00')) elif child.tag.endswith('Atom}summary'): summary = child.text elif child.tag.endswith('Atom}content') and not summary: summary = child.text return Item(title, link, date, summary) def parse_atom(xml, url): """Parse given Atom feed.""" title = url entries = [] for child in xml: if child.tag.endswith('Atom}title'): title = child.text elif child.tag.endswith('Atom}entry'): entries.append(parse_atom_entry(child)) return Feed(title, entries) async def fetch(url): """Fetch web feed from given URL and return it parsed.""" if url.scheme == 'https': reader, writer = await open_connection(url.hostname, 443, ssl=True) elif url.scheme == 'http': reader, writer = await open_connection(url.hostname, 80) else: raise ValueError(f'unsupported URL scheme: {url.scheme}') writer.write(REQUEST.format(url.path or '/', url.hostname).encode()) response = HTTPResponse(BytesSocket(await reader.read())) writer.close() response.begin() with response: if response.status >= 400: raise HTTPError(url.geturl(), response.status, response.reason, response.getheaders(), response) if response.status >= 300: location = urljoin(url.geturl(), response.getheader('Location')) print(url.geturl(), '->', location) return await fetch(urlsplit(location)) if response.status >= 200: xml = parse_xml(response.read()) if xml.tag == 'rss': assert xml[0].tag == 'channel' return parse_rss(xml[0], url.hostname) if xml.tag.endswith('Atom}feed'): return parse_atom(xml, url.hostname) raise ValueError(f'unsupported feed format at {url.geturl()}') # FIXME: handle informational responses async def fetch_all(urls): """Fetch all given URLs asynchronously and return them parsed.""" tasks = gather(*(fetch(urlsplit(url)) for url in urls)) try: return await tasks except: tasks.cancel() raise if __name__ == '__main__': feeds = run(fetch_all(stdin.readlines())) for feed in feeds: print(feed)