# XML processing abstractions # Copyright (C) 2023 Nguyễn Gia Phong # # This file is part of rub. # # Rub is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Rub is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with rub. If not, see . from copy import deepcopy from pathlib import Path from lxml.builder import E from lxml.etree import (CDATA, QName, Resolver, XML, XMLParser, XSLT, XSLTExtension, tostring as serialize) __all__ = ['NS', 'Processor', 'recurse'] NS = 'https://rub.parody' GEN_OMNIFEED = ''' ''' def serialize_content(element) -> str: text = element.text.encode() if element.text else b'' children = b''.join(serialize(deepcopy(i)) for i in element) return text + children def recurse(extension, context, input_node, output_parent): """Apply template recursively on input node.""" output = deepcopy(input_node) for i in output: output.remove(i) for i in input_node: for j in extension.apply_templates(context, i): if not isinstance(j, str): output.append(deepcopy(j)) elif len(output) == 0: if output.text is None: output.text = j else: output.text += j elif output[-1].tail is None: output[-1].tail = j else: output[-1].tail += j output_parent.append(output) class Evaluator(XSLTExtension): def __init__(self, **handlers): self.handlers = {QName(NS, k).text: v for k, v in handlers.items()} super().__init__() def execute(self, context, self_node, input_node, output_parent): handle = self.handlers.get(input_node.tag, recurse) handle(self, context, input_node, output_parent) class Serializer(XSLTExtension): def execute(self, context, self_node, input_node, output_parent): output_parent.text = CDATA(serialize_content(input_node)) class Processor: """Callable XSLT processor.""" def __init__(self, xslt: Path, change_name, **handlers) -> None: self.xslt, self.change_name = xslt, change_name stylesheet = xslt.read_bytes() extensions = {(NS, 'eval'): Evaluator(**handlers), (NS, 'serialize'): Serializer()} self.transform = XSLT(XML(stylesheet), extensions=extensions) def process(self, src: Path, dest: Path) -> None: dest.parent.mkdir(mode=0o755, parents=True, exist_ok=True) dest.write_text(str(self.transform(XML(src.read_bytes())))) def gen_metadata(sources: list[Path], pages: list[Path], out_dir: Path, dest: Path) -> None: """Extract metadata from all source pages.""" entries = [] for src, page in zip(sources, pages): src_root = XML(src.read_bytes()) desc = src_root.findtext('description', '', {None: NS}) if not desc: continue title = src_root.findtext('title', '', {None: NS}) date = src_root.findtext('date', '', {None: NS}) categories = src_root.itertext(tag=QName(NS, 'category').text, with_tail=False) path = str(page.relative_to(out_dir)) entries.append(E.entry(E.title(title), E.description(desc), E.date(date), *map(E.category, categories), E.path(path))) dest.parent.mkdir(mode=0o755, parents=True, exist_ok=True) dest.write_bytes(serialize(E.feed(*entries), pretty_print=True)) class PageResolver(Resolver): """URI resolver for use in XSLT document function.""" def __init__(self, base: Path) -> None: self.base = base super().__init__() def resolve(self, path, public_id, context): return self.resolve_filename(str(self.base/path), context) def gen_omnifeed(metadata: Path, out_dir: Path, dest: Path) -> None: """Generate generic global feed.""" parser = XMLParser() parser.resolvers.add(PageResolver(out_dir)) transform = XSLT(XML(GEN_OMNIFEED, parser)) omnifeed = transform(XML(metadata.read_bytes())) dest.parent.mkdir(mode=0o755, parents=True, exist_ok=True) dest.write_bytes(serialize(omnifeed, pretty_print=True))