#!/usr/bin/env python3
# Format mbox as HTML/XML
# Copyright (C) 2021-2022 Nguyễn Gia Phong
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
from asyncio import gather, open_connection, run
from collections import namedtuple
from datetime import datetime
from email.utils import parsedate_to_datetime
from http.client import HTTPResponse
from io import BytesIO
from sys import stdin
from urllib.error import HTTPError
from urllib.parse import urljoin, urlsplit
from xml.etree.ElementTree import fromstring as parse_xml
REQUEST = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'
Feed = namedtuple('Feed', ('title', 'items'))
Item = namedtuple('Item', ('title', 'link', 'date', 'description'))
class BytesSocket:
"""Duck socket for HTTPResponse."""
def __init__(self, response):
self.bytes = response
def makefile(self, mode, *args, **kwargs):
"""Return a bytes stream."""
assert mode == 'rb'
return BytesIO(self.bytes)
def parse_rss_item(xml):
"""Parse given RSS item."""
date = None
description = ''
for child in xml:
if child.tag == 'title':
title = child.text
elif child.tag == 'link':
link = child.text
elif child.tag == 'pubDate':
date = parsedate_to_datetime(child.text)
elif child.tag == 'description':
description = child.text
elif child.tag == 'content:encoded' and not description:
description = child.text
if not description:
description = xml.text
return Item(title, link, date, description)
def parse_rss(xml, url):
"""Parse given RSS feed."""
title = url
items = []
for child in xml:
if child.tag == 'title':
title = child.text
elif child.tag == 'item':
items.append(parse_rss_item(child))
return Feed(title, items)
def parse_atom_entry(xml):
"""Parse given Atom entry."""
date = None
summary = ''
for child in xml:
if child.tag.endswith('Atom}title'):
title = child.text
elif child.tag.endswith('Atom}link'):
link = child.attrib['href']
elif child.tag.endswith('Atom}published'):
date = datetime.fromisoformat(child.text.replace('Z', '+00:00'))
elif child.tag.endswith('Atom}summary'):
summary = child.text
elif child.tag.endswith('Atom}content') and not summary:
summary = child.text
return Item(title, link, date, summary)
def parse_atom(xml, url):
"""Parse given Atom feed."""
title = url
entries = []
for child in xml:
if child.tag.endswith('Atom}title'):
title = child.text
elif child.tag.endswith('Atom}entry'):
entries.append(parse_atom_entry(child))
return Feed(title, entries)
async def fetch(url):
"""Fetch web feed from given URL and return it parsed."""
if url.scheme == 'https':
reader, writer = await open_connection(url.hostname, 443, ssl=True)
elif url.scheme == 'http':
reader, writer = await open_connection(url.hostname, 80)
else:
raise ValueError(f'unsupported URL scheme: {url.scheme}')
writer.write(REQUEST.format(url.path or '/', url.hostname).encode())
response = HTTPResponse(BytesSocket(await reader.read()))
writer.close()
response.begin()
with response:
if response.status >= 400:
raise HTTPError(url.geturl(), response.status, response.reason,
response.getheaders(), response)
if response.status >= 300:
location = urljoin(url.geturl(), response.getheader('Location'))
print(url.geturl(), '->', location)
return await fetch(urlsplit(location))
if response.status >= 200:
xml = parse_xml(response.read())
if xml.tag == 'rss':
assert xml[0].tag == 'channel'
return parse_rss(xml[0], url.hostname)
if xml.tag.endswith('Atom}feed'):
return parse_atom(xml, url.hostname)
raise ValueError(f'unsupported feed format at {url.geturl()}')
# FIXME: handle informational responses
async def fetch_all(urls):
"""Fetch all given URLs asynchronously and return them parsed."""
tasks = gather(*(fetch(urlsplit(url)) for url in urls))
try:
return await tasks
except:
tasks.cancel()
raise
if __name__ == '__main__':
feeds = run(fetch_all(stdin.readlines()))
for feed in feeds:
print(feed)