aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNguyễn Gia Phong <mcsinyx@disroot.org>2022-11-09 03:10:02 +0900
committerNguyễn Gia Phong <mcsinyx@disroot.org>2022-11-09 03:10:02 +0900
commitf5906a926b80717691ef55bbaf02c9e06c257cbf (patch)
treee4b34f48fdd8163ef1de5df8568508c0c6f1d3bd /src
parentc24337da6add85b3d1786d6326169f979088e490 (diff)
downloadfead-f5906a926b80717691ef55bbaf02c9e06c257cbf.tar.gz
Achieve feature parity with openring
Diffstat (limited to 'src')
-rwxr-xr-xsrc/fead.py108
1 files changed, 83 insertions, 25 deletions
diff --git a/src/fead.py b/src/fead.py
index 4fd2b1b..ac1fcd7 100755
--- a/src/fead.py
+++ b/src/fead.py
@@ -15,6 +15,9 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
+__version__ = '0.0.1'
+
+from argparse import ArgumentParser, FileType
from asyncio import gather, open_connection, run
from collections import namedtuple
from datetime import datetime
@@ -22,18 +25,27 @@ from email.utils import parsedate_to_datetime
from http.client import HTTPResponse
from io import BytesIO
from operator import attrgetter
-from sys import stdin
+from pathlib import Path
+from re import compile as regex
+from sys import stdin, stdout
+from textwrap import shorten
from urllib.error import HTTPError
from urllib.parse import urljoin, urlsplit
from warnings import warn
from xml.etree.ElementTree import fromstring as parse_xml
REQUEST = 'GET {} HTTP/1.0\r\nHost: {}\r\n\r\n'
-Item = namedtuple('Item', ('title', 'link', 'time', 'summary'))
+HTML_TAG = regex('<.+?>')
+
Advert = namedtuple('Advert', ('source_title', 'source_link',
'title', 'link', 'time', 'summary'))
+def read_urls(path):
+ """Read newline-separated URLs from given file path."""
+ return Path(path).read_text().splitlines()
+
+
class BytesSocket:
"""Duck socket for HTTPResponse."""
def __init__(self, response):
@@ -47,7 +59,7 @@ class BytesSocket:
def parse_rss_item(xml):
"""Parse given RSS item."""
- time = datetime.fromtimestamp(0).astimezone(None)
+ time = datetime.fromtimestamp(0)
description = ''
for child in xml:
if child.tag == 'title':
@@ -55,14 +67,14 @@ def parse_rss_item(xml):
elif child.tag == 'link':
link = child.text
elif child.tag == 'pubDate':
- time = parsedate_to_datetime(child.text).astimezone(None)
+ time = parsedate_to_datetime(child.text)
elif child.tag == 'description':
description = child.text
elif child.tag == 'content:encoded' and not description:
description = child.text
if not description:
description = xml.text
- return Item(title, link, time, description)
+ return title, link, time, description
def parse_rss(xml, title):
@@ -75,12 +87,12 @@ def parse_rss(xml, title):
link = child.text
elif child.tag == 'item':
items.append(parse_rss_item(child))
- return Advert(title, link, *max(items, key=attrgetter('time')))
+ return title, link, items
def parse_atom_entry(xml):
"""Parse given Atom entry."""
- time = datetime.fromtimestamp(0).astimezone(None)
+ time = datetime.fromtimestamp(0)
summary = ''
for child in xml:
if child.tag.endswith('Atom}title'):
@@ -90,12 +102,12 @@ def parse_atom_entry(xml):
if rel == 'alternate' or not rel: link = child.attrib['href']
elif child.tag.endswith('Atom}published'):
iso = child.text.replace('Z', '+00:00') # normalized
- time = datetime.fromisoformat(iso).astimezone(None)
+ time = datetime.fromisoformat(iso)
elif child.tag.endswith('Atom}summary'):
summary = child.text
elif child.tag.endswith('Atom}content') and not summary:
summary = child.text
- return Item(title, link, time, summary)
+ return title, link, time, summary
def parse_atom(xml, title):
@@ -109,11 +121,12 @@ def parse_atom(xml, title):
if rel == 'alternate' or not rel: link = child.attrib['href']
elif child.tag.endswith('Atom}entry'):
entries.append(parse_atom_entry(child))
- return Advert(title, link, *max(entries, key=attrgetter('time')))
+ return title, link, entries
-async def fetch(url):
+async def fetch(raw_url):
"""Fetch web feed from given URL and return it parsed."""
+ url = urlsplit(raw_url)
if url.scheme == 'https':
reader, writer = await open_connection(url.hostname, 443, ssl=True)
elif url.scheme == 'http':
@@ -127,29 +140,35 @@ async def fetch(url):
response.begin()
with response:
if response.status >= 400:
- raise HTTPError(url.geturl(), response.status,
- f'{response.reason}: {url.geturl()}',
+ raise HTTPError(raw_url, response.status,
+ f'{response.reason}: {raw_url}',
response.getheaders(), response)
if response.status >= 300:
- location = urljoin(url.geturl(), response.getheader('Location'))
- warn(f'{url.geturl()} redirects to {location}')
- return await fetch(urlsplit(location))
+ location = urljoin(raw_url, response.getheader('Location'))
+ warn(f'{raw_url} -> {location}',
+ type('RedirectWarning', (Warning,), {}))
+ return await fetch(location)
if response.status >= 200:
xml = parse_xml(response.read())
if xml.tag == 'rss':
assert xml[0].tag == 'channel'
- return parse_rss(xml[0], url.hostname)
- if xml.tag.endswith('Atom}feed'):
- return parse_atom(xml, url.hostname)
- raise ValueError(f'unsupported feed format at {url.geturl()}')
- raise HTTPError(url.geturl(), response.status,
- f'{response.reason}: {url.geturl()}',
+ src_title, src_link, items = parse_rss(xml[0], url.hostname)
+ elif xml.tag.endswith('Atom}feed'):
+ src_title, src_link, items = parse_atom(xml, url.hostname)
+ else:
+ raise ValueError(f'unsupported feed format at {raw_url}')
+ return (Advert(src_title, urljoin(raw_url, src_link),
+ title, urljoin(raw_url, link),
+ time.astimezone(None), summary)
+ for title, link, time, summary in items)
+ raise HTTPError(raw_url, response.status,
+ f'{response.reason}: {raw_url}',
response.getheaders(), response)
async def fetch_all(urls):
"""Fetch all given URLs asynchronously and return them parsed."""
- tasks = gather(*(fetch(urlsplit(url)) for url in urls))
+ tasks = gather(*map(fetch, urls))
try:
return await tasks
except:
@@ -157,6 +176,45 @@ async def fetch_all(urls):
raise
+def select(n, ads):
+ """Return n most recent ads from given iterable."""
+ return sorted(ads, key=attrgetter('time'), reverse=True)[:n]
+
+
+def truncate(ad, summary_length):
+ """Return ad with truncated summary, whose HTML tags a stripped."""
+ return ad._replace(summary=shorten(HTML_TAG.sub(ad.summary, ''),
+ summary_length, placeholder='…'))
+
+
if __name__ == '__main__':
- feeds = sorted(run(fetch_all(stdin.readlines())),
- key=attrgetter('time'), reverse=True)
+ parser = ArgumentParser(description='generate advert from web feeds')
+ parser.add_argument('-v', '--version', action='version',
+ version=f'fead {__version__}')
+ parser.add_argument('-F', '--feeds', metavar='PATH',
+ type=read_urls, default=[],
+ help='file containing newline-separated web feed URLs')
+ parser.add_argument('-f', '--feed', metavar='URL',
+ action='append', dest='feeds',
+ help='addtional web feed URL (multiple use)')
+ parser.add_argument('-n', '--count', metavar='N', type=int, default=3,
+ help='maximum number of ads in total (default to 3)')
+ parser.add_argument('-p', '--per-feed', metavar='N', type=int, default=1,
+ help='maximum number of ads per feed (default to 1)')
+ parser.add_argument('-l', '--length', metavar='N',
+ dest='len', type=int, default=256,
+ help='maximum summary length (default to 256)')
+ parser.add_argument('-t', '--template', metavar='PATH',
+ type=FileType('r'), default=stdin,
+ help='template file (default to stdin)')
+ parser.add_argument('-o', '--output', metavar='PATH',
+ type=FileType('w'), default=stdout,
+ help='output file (default to stdout)')
+ args = parser.parse_args()
+
+ template = args.template.read()
+ args.template.close()
+ for ad in select(args.count, (ad for feed in run(fetch_all(args.feeds))
+ for ad in select(args.per_feed, feed))):
+ args.output.write(template.format(**truncate(ad, args.len)._asdict()))
+ args.output.close()