From abe85863371957151701c2f41739495d02611c6f Mon Sep 17 00:00:00 2001 From: Nguyễn Gia Phong Date: Thu, 23 Mar 2023 00:12:56 +0900 Subject: Implement feed generation --- src/rub/__init__.py | 78 +++++++++++++++++++++++++++++++++++------------------ src/rub/xml.py | 48 +++++++++++++++++++++++++-------- 2 files changed, 89 insertions(+), 37 deletions(-) (limited to 'src') diff --git a/src/rub/__init__.py b/src/rub/__init__.py index 904f47a..3be303b 100644 --- a/src/rub/__init__.py +++ b/src/rub/__init__.py @@ -20,44 +20,49 @@ from functools import cached_property from os import walk from pathlib import Path from shutil import copytree, rmtree +from typing import Iterator -from rub import xml +from doit import run as do -__all__ = ['Rubber', 'xml'] +from rub.xml import Processor, gen_omnifeed +__all__ = ['rub'] -def glob_files(root: Path, suffix=''): +OMNIFEED = Path('feed.xml') + + +def glob_files(root: Path, suffix='') -> list[Path]: """Return the list of all files in given directory, recursively.""" return [Path(path).relative_to(root)/file for path, dirs, files in walk(root) for file in files if file.endswith(suffix)] -def replace(source: Path, destination: Path): +def replace(source: Path, destination: Path) -> None: """Replace destination with source directory.""" rmtree(destination, ignore_errors=True) copytree(source, destination, dirs_exist_ok=True) +def processing_task(proc: Processor, path: Path, + src_dir: Path, dest_dir: Path, doc: str) -> dict: + source, dest = src_dir / path, proc.change_name(dest_dir/path) + return {'name': f'/{proc.change_name(path)}', 'doc': doc, + 'file_dep': [proc.xslt, source], + 'actions': [(proc.process, [source, dest])], + 'targets': [dest], 'clean': True} + + class Rubber: """Static generator.""" - def __init__(self, generate_article, base, src, cache, out): - self.generate_article = generate_article + def __init__(self, page_proc: Processor, feed_proc: Processor, + base: Path, src: Path, cache: Path, out: Path) -> None: + self.page_proc, self.feed_proc = page_proc, feed_proc self.base, self.src = base, src self.cache, self.out = cache, out - @cached_property - def tasks(self): - def assox(): - for k in dir(self): - if not k.startswith('task_'): continue - v = getattr(self, k) - if callable(v): yield k, v - - return dict(assox()) - - def task_base(self): + def task_base(self) -> dict: paths = glob_files(self.base) return {'doc': 'copy base directory', 'file_dep': [self.base/path for path in paths], @@ -65,12 +70,33 @@ class Rubber: 'targets': [self.out/path for path in paths], 'clean': True} - def task_articles(self): - """process articles into XHTML""" - for path in glob_files(self.src, '.xml'): - source = self.src / path - destination = self.out / path - yield {'name': path, 'doc': f'process {path} into XHTML', - 'file_dep': [source], - 'actions': [(self.generate_article, [source, destination])], - 'targets': [destination], 'clean': True} + @cached_property + def sources(self) -> list[Path]: + return glob_files(self.src, '.xml') + + def task_pages(self) -> Iterator[dict]: + yield {'name': None, 'doc': 'process sources into web pages'} + for path in self.sources: + yield processing_task(self.page_proc, path, self.src, self.out, + f'process {path} into a web page') + + def task_feeds(self) -> Iterator[dict]: + yield {'name': None, 'doc': 'generate web feeds'} + feed_src = self.cache / OMNIFEED + sources = [self.src/path for path in self.sources] + pages = [self.page_proc.change_name(self.out/path) + for path in self.sources] + yield {'name': 'source', 'doc': 'generate generic global feed', + 'file_dep': sources+pages, + 'actions': [(gen_omnifeed, + [sources, pages, self.out, feed_src])], + 'targets': [feed_src], 'clean': True} + yield processing_task(self.feed_proc, OMNIFEED, self.cache, self.out, + 'generate global feed') + + +def rub(page_proc: Processor, feed_proc: Processor, + base: Path, src: Path, cache: Path, out: Path) -> None: + """Generate static website.""" + rubber = Rubber(page_proc, feed_proc, base, src, cache, out) + do({k: getattr(rubber, k) for k in dir(rubber)}) diff --git a/src/rub/xml.py b/src/rub/xml.py index ed61a8b..4c3a2ae 100644 --- a/src/rub/xml.py +++ b/src/rub/xml.py @@ -17,12 +17,13 @@ # along with rub. If not, see . from copy import deepcopy -from functools import partial from pathlib import Path -from lxml.etree import QName, XML, XSLT, XSLTExtension +from lxml.builder import E +from lxml.html import document_fromstring as from_html +from lxml.etree import QName, XML, XSLT, XSLTExtension, tostring as serialize -__all__ = ['NS', 'generator', 'recurse'] +__all__ = ['NS', 'Processor', 'recurse'] NS = 'https://rub.parody' @@ -57,14 +58,39 @@ class Evaluator(XSLTExtension): handle(self, context, input_node, output_parent) -def generator(xslt, **handlers): - """Return a function taking an XML file and apply given XSLT.""" - stylesheet = xslt.read_bytes() - extensions = {(NS, 'eval'): Evaluator(**handlers)} - transform = XSLT(XML(stylesheet), extensions=extensions) +class Serializer(XSLTExtension): + def execute(self, context, self_node, input_node, output_parent): + output_parent.text = serialize(deepcopy(input_node)) + + +class Processor: + """Callable XSLT processor.""" - def make(src, dest): + def __init__(self, xslt: Path, change_name, **handlers) -> None: + self.xslt, self.change_name = xslt, change_name + stylesheet = xslt.read_bytes() + extensions = {(NS, 'eval'): Evaluator(**handlers), + (NS, 'serialize'): Serializer()} + self.transform = XSLT(XML(stylesheet), extensions=extensions) + + def process(self, src: Path, dest: Path) -> None: dest.parent.mkdir(mode=0o755, parents=True, exist_ok=True) - dest.write_text(str(transform(XML(src.read_bytes())))) + dest.write_text(str(self.transform(XML(src.read_bytes())))) + - return make +def gen_omnifeed(sources: list[Path], pages: list[Path], + out_dir: Path, dest: Path) -> None: + """Generate generic global feed.""" + entries = [] + for src, page in zip(sources, pages): + src_root = XML(src.read_bytes()) + desc = src_root.findtext('description', '', {None: NS}) + if not desc: continue + title = src_root.findtext('title', '', {None: NS}) + date = src_root.findtext('date', '', {None: NS}) + page_root = from_html(page.read_bytes()) + path = str(page.relative_to(out_dir)) + entries.append(E.entry(E.title(title), E.description(desc), + E.date(date), E.path(path), page_root)) + dest.parent.mkdir(mode=0o755, parents=True, exist_ok=True) + dest.write_bytes(serialize(E.feed(*entries), pretty_print=True)) -- cgit 1.4.1