diff --git a/integraality/pages_processor.py b/integraality/pages_processor.py index 6bd01da..c2cbbb9 100644 --- a/integraality/pages_processor.py +++ b/integraality/pages_processor.py @@ -1,161 +1,161 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Bot to generate statistics """ import re from ww import f import pywikibot from pywikibot import pagegenerators from property_statistics import ( PropertyConfig, PropertyStatistics, QueryException ) REQUIRED_CONFIG_FIELDS = ['selector_sparql', 'grouping_property', 'properties'] class ProcessingException(Exception): pass class ConfigException(ProcessingException): pass class NoEndTemplateException(ProcessingException): pass class PagesProcessor: def __init__(self): site = pywikibot.Site('en', 'wikipedia') self.repo = site.data_repository() self.template_name = 'Property dashboard' self.end_template_name = 'Property dashboard end' self.summary = u'Update property usage stats' self.outputs = [] def get_all_pages(self): template = pywikibot.Page(self.repo, self.template_name, ns=10) return pagegenerators.ReferringPageGenerator(template, onlyTemplateInclusion=True) @staticmethod def extract_elements_from_template_param(template_param): """Extract and sanitize the contents of a parsed template param.""" (field, _, value) = template_param.partition(u'=') - return (field.strip(), value) + return (field.strip(), value.replace('{{!}}', '|')) def parse_config_from_params(self, params): return { key: value for (key, value) in [self.extract_elements_from_template_param(param) for param in params] if key } def make_stats_object_for_page(self, page): all_templates_with_params = page.templatesWithParams() if self.end_template_name not in [template.title(with_ns=False) for (template, _) in all_templates_with_params]: raise NoEndTemplateException("No end template '%s' provided" % self.end_template_name) start_templates_with_params = [ (template, params) for (template, params) in all_templates_with_params if template.title(with_ns=False) == self.template_name ] if len(start_templates_with_params) > 1: pywikibot.warn("More than one template on the page %s" % page.title()) (template, params) = start_templates_with_params[0] parsed_config = self.parse_config_from_params(params) config = self.parse_config(parsed_config) try: return PropertyStatistics(**config) except TypeError: raise ConfigException("The template parameters are incorrect.") def process_page(self, page): stats = self.make_stats_object_for_page(page) try: output = stats.retrieve_and_process_data() except QueryException as e: raise ConfigException(e) new_text = self.replace_in_page(output, page.get()) page.put(new_text, self.summary) def parse_config(self, config): for field in REQUIRED_CONFIG_FIELDS: if field not in config: pywikibot.output("Missing required field %s" % field) raise ConfigException("A required field is missing: %s" % field) config['properties'] = self.parse_config_properties(config['properties']) config['stats_for_no_group'] = bool(config.get('stats_for_no_group', False)) return config @staticmethod def parse_config_properties(properties_string): properties = properties_string.split(',') properties_data = [] for prop in properties: try: (key, title) = prop.split(':') except ValueError: (key, title) = (prop, None) if key: splitted = key.split('/') if len(splitted) == 3: (property_name, value, qualifier) = splitted elif len(splitted) == 2: (property_name, value, qualifier) = (splitted[0], None, splitted[1]) else: (property_name, value, qualifier) = (key, None, None) entry = PropertyConfig(property=property_name, title=title, qualifier=qualifier, value=value) properties_data.append(entry) return properties_data def replace_in_page(self, output, page_text): regex_text = f('({{{{{self.template_name}.*?}}}}).*?({{{{{self.end_template_name}}}}})') regex = re.compile(regex_text, re.MULTILINE | re.DOTALL) new_text = re.sub(regex, r'\1\n%s\n\2' % output, page_text, count=1) return new_text def process_all(self): self.summary = u'Weekly update of property usage stats' for page in self.get_all_pages(): pywikibot.output("Processing page %s" % page.title()) try: self.process_page(page) except NoEndTemplateException: pywikibot.output("No end template on page %s, skipping" % page.title()) except ConfigException: pywikibot.output("Bad configuration on page %s, skipping" % page.title()) except Exception as e: pywikibot.output("Unknown error with page %s: %s" % (page.title(), e)) def process_one_page(self, page_title): page = pywikibot.Page(self.repo, page_title) pywikibot.output("Processing page %s" % page.title()) self.process_page(page) def make_stats_object_for_page_title(self, page_title): page = pywikibot.Page(self.repo, page_title) return self.make_stats_object_for_page(page) def main(*args): """ Main function. Bot does all the work. """ processor = PagesProcessor() processor.process_all() if __name__ == "__main__": main() diff --git a/integraality/tests/test_pages_processor.py b/integraality/tests/test_pages_processor.py index 58a278c..a6eb32e 100644 --- a/integraality/tests/test_pages_processor.py +++ b/integraality/tests/test_pages_processor.py @@ -1,214 +1,225 @@ # -*- coding: utf-8 -*- """Unit tests for functions.py.""" import unittest from integraality.pages_processor import ( ConfigException, PagesProcessor, PropertyConfig ) class ProcessortTest(unittest.TestCase): def setUp(self): self.processor = PagesProcessor() class Test(ProcessortTest): def setUp(self): self.text = """ Head {{Property dashboard start |properties=P136:genre,P404 |grouping_property=P400 |stats_for_no_group=1 |selector_sparql=wdt:P31/wdt:P279* wd:Q7889 |target_page_title=Wikidata:WikiProject Video games/Statistics/Platform |grouping_link=Wikidata::WikiProject Video games/Reports/Platform }} foo {{Property dashboard end}} Bottom """ self.final_text = """ Head {{Property dashboard start |properties=P136:genre,P404 |grouping_property=P400 |stats_for_no_group=1 |selector_sparql=wdt:P31/wdt:P279* wd:Q7889 |target_page_title=Wikidata:WikiProject Video games/Statistics/Platform |grouping_link=Wikidata::WikiProject Video games/Reports/Platform }} bar {{Property dashboard end}} Bottom """ def test(self): processor = PagesProcessor() result = processor.replace_in_page("bar", self.text) self.assertEqual(result, self.final_text) class TestParseConfig(ProcessortTest): def setUp(self): self.processor = PagesProcessor() def test_normal_config(self): input_config = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': '1', 'properties': 'P136:genre,P404', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', } result = self.processor.parse_config(input_config) expected = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': True, 'properties': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889' } self.assertEqual(result, expected) def test_minimal_config(self): input_config = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'properties': 'P136:genre,P404', } result = self.processor.parse_config(input_config) expected = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'properties': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'stats_for_no_group': False, } self.assertEqual(result, expected) def test_full_config(self): input_config = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': '1', 'properties': 'P136:genre,P404', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_threshold': '1', 'property_threshold': '2', } result = self.processor.parse_config(input_config) expected = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'properties': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'stats_for_no_group': True, 'grouping_threshold': '1', 'property_threshold': '2', } self.assertEqual(result, expected) def test_empty_config(self): input_config = {} with(self.assertRaises(ConfigException)): self.processor.parse_config(input_config) def test_insufficient_config(self): input_config = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', } with(self.assertRaises(ConfigException)): self.processor.parse_config(input_config) class TestParseParams(ProcessortTest): def test_parse_config_from_params_minimal(self): params = ['grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213'] expected = { 'grouping_property': 'P195', 'properties': 'P170:creator,P276', 'selector_sparql': 'wdt:P31 wd:Q3305213' } result = self.processor.parse_config_from_params(params) self.assertEqual(result, expected) def test_parse_config_from_params_with_empty_param(self): params = ['', 'grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213'] expected = { 'grouping_property': 'P195', 'properties': 'P170:creator,P276', 'selector_sparql': 'wdt:P31 wd:Q3305213' } result = self.processor.parse_config_from_params(params) self.assertEqual(result, expected) + def test_parse_config_from_params_with_escaped_pipe(self): + params = ['grouping_property=P195', 'properties=P170:creator,P276', + 'selector_sparql=REGEX(?id, "^(a{{!}}b)")'] + expected = { + 'grouping_property': 'P195', + 'properties': 'P170:creator,P276', + 'selector_sparql': 'REGEX(?id, "^(a|b)")' + } + result = self.processor.parse_config_from_params(params) + self.assertEqual(result, expected) + class TestParseConfigProperties(ProcessortTest): def test(self): properties = 'P136:genre,P404' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ] self.assertEqual(result, expected) def test_with_trail_comma(self): properties = 'P136:genre,P404,' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ] self.assertEqual(result, expected) def test_more_properties(self): properties = 'P136,P178,P123,P495,P577,P404,P437' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136'), PropertyConfig(property='P178'), PropertyConfig(property='P123'), PropertyConfig(property='P495'), PropertyConfig(property='P577'), PropertyConfig(property='P404'), PropertyConfig(property='P437'), ] self.assertEqual(result, expected) def test_with_qualifier(self): properties = 'P136:genre,P404,P669/P670' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), PropertyConfig(property='P669', qualifier='P670'), ] self.assertEqual(result, expected) def test_with_qualifier_and_value(self): properties = 'P136:genre,P404,P553/Q17459/P670' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), PropertyConfig(property='P553', value='Q17459', qualifier='P670') ] self.assertEqual(result, expected)