diff --git a/integraality/pages_processor.py b/integraality/pages_processor.py index 74134ec..a26b4bd 100644 --- a/integraality/pages_processor.py +++ b/integraality/pages_processor.py @@ -1,207 +1,211 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Bot to generate statistics """ import os import re from redis import StrictRedis from ww import f import pywikibot from pywikibot import pagegenerators from cache import RedisCache from property_statistics import ( ColumnConfigMaker, + ColumnSyntaxException, PropertyStatistics, QueryException ) REQUIRED_CONFIG_FIELDS = ['selector_sparql', 'grouping_property', 'properties'] class ProcessingException(Exception): pass class ConfigException(ProcessingException): pass class NoEndTemplateException(ProcessingException): pass class NoStartTemplateException(ProcessingException): pass class PagesProcessor: def __init__(self, url="https://www.wikidata.org/wiki/", cache_client=None): self.site = pywikibot.Site(url=url) self.template_name = 'Property dashboard' self.end_template_name = 'Property dashboard end' self.summary = u'Update property usage stats' self.outputs = [] if not cache_client: host = os.getenv("REDIS_HOST", 'tools-redis.svc.eqiad.wmflabs') cache_client = StrictRedis(host=host, decode_responses=False) self.cache = RedisCache(cache_client=cache_client) def make_cache_key(self, page_title): return ":".join([self.site.code, page_title]).replace(" ", "_") def get_all_pages(self): template = pywikibot.Page(self.site, self.template_name, ns=10) return pagegenerators.ReferringPageGenerator(template, onlyTemplateInclusion=True) @staticmethod def extract_elements_from_template_param(template_param): """Extract and sanitize the contents of a parsed template param.""" (field, _, value) = template_param.partition(u'=') return (field.strip(), value.replace('{{!}}', '|')) def parse_config_from_params(self, params): return { key: value for (key, value) in [self.extract_elements_from_template_param(param) for param in params] if key } def make_stats_object_arguments_for_page(self, page): all_templates_with_params = page.templatesWithParams() if self.template_name not in [template.title(with_ns=False) for (template, _) in all_templates_with_params]: msg = ( "No start template '%s' found." "The likely explanation is that inteGraality was invoked from a page that transcludes the page with the template. " "Please invoke inteGraality directly from the page with the template." % self.template_name ) raise NoStartTemplateException(msg) if self.end_template_name not in [template.title(with_ns=False) for (template, _) in all_templates_with_params]: raise NoEndTemplateException("No end template '%s' provided" % self.end_template_name) start_templates_with_params = [ (template, params) for (template, params) in all_templates_with_params if template.title(with_ns=False) == self.template_name ] if len(start_templates_with_params) > 1: pywikibot.warn("More than one template on the page %s" % page.title()) (template, params) = start_templates_with_params[0] parsed_config = self.parse_config_from_params(params) config = self.parse_config(parsed_config) key = self.make_cache_key(page.title()) self.cache.set_cache_value(key, config) return config def make_stats_object_for_page(self, page): config = self.make_stats_object_arguments_for_page(page) try: return PropertyStatistics(**config) except TypeError: raise ConfigException("The template parameters are incorrect.") def process_page(self, page): self.cache.invalidate(self.make_cache_key(page.title())) stats = self.make_stats_object_for_page(page) try: output = stats.retrieve_and_process_data() except QueryException as e: raise ConfigException(e) new_text = self.replace_in_page(output, page.get()) page.put(new_text, self.summary) def parse_config(self, config): for field in REQUIRED_CONFIG_FIELDS: if field not in config: pywikibot.output("Missing required field %s" % field) raise ConfigException("A required field is missing: %s" % field) config['columns'] = self.parse_config_properties(config['properties']) del config['properties'] config['stats_for_no_group'] = bool(config.get('stats_for_no_group', False)) return config @staticmethod def parse_config_properties(properties_string): properties = [x.strip() for x in properties_string.split(',')] properties_data = [] for prop in properties: try: (key, title) = prop.split(':') except ValueError: (key, title) = (prop, None) if key: - properties_data.append(ColumnConfigMaker.make(key, title)) + try: + properties_data.append(ColumnConfigMaker.make(key, title)) + except ColumnSyntaxException as e: + raise ConfigException(e) return properties_data def replace_in_page(self, output, page_text): regex_text = f('({{{{{self.template_name}.*?(?= {property_statistics.property_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) return query def get_totals_query(self, property_statistics): """ Get the totals of entities with the column set. :return: (str) SPARQL query """ query = f(""" SELECT (COUNT(*) as ?count) WHERE {{ ?entity {property_statistics.selector_sparql} FILTER(EXISTS {{{self.get_filter_for_info()} }}) }} """) return query def get_info_no_grouping_query(self, property_statistics): """ Get the usage counts for a column without a grouping :return: (str) SPARQL query """ query = f(""" SELECT (COUNT(*) AS ?count) WHERE {{ ?entity {property_statistics.selector_sparql} . MINUS {{ ?entity wdt:{property_statistics.grouping_property} _:b28. }} FILTER(EXISTS {{{self.get_filter_for_info()} }}) }} GROUP BY ?grouping ORDER BY DESC (?count) LIMIT 10 """) return query class PropertyConfig(ColumnConfig): def __init__(self, property, title=None, value=None, qualifier=None): self.property = property self.title = title self.value = value self.qualifier = qualifier def __eq__(self, other): return ( self.property == other.property and self.title == other.title and self.value == other.value and self.qualifier == other.qualifier ) def get_title(self): return "/".join([x for x in [self.property, self.value, self.qualifier] if x]) def get_key(self): return "".join([x for x in [self.property, self.value, self.qualifier] if x]) def make_column_header(self): if self.qualifier: property_link = self.qualifier else: property_link = self.property if self.title: label = f('[[Property:{property_link}|{self.title}]]') else: label = f('{{{{Property|{property_link}}}}}') return f('! data-sort-type="number"|{label}\n') def get_filter_for_info(self): if self.qualifier: return f(""" ?entity p:{self.property} [ ps:{self.property} {self.value or '[]'} ; pq:{self.qualifier} [] ]""") else: return f(""" ?entity p:{self.property}[]""") class TextConfig(ColumnConfig): def __init__(self, language, title=None): self.language = language self.title = title def __eq__(self, other): return ( self.language == other.language and self.title == other.title ) def get_title(self): return self.get_key() def make_column_header(self): if self.title: text = f('{self.title}') else: text = f('{{{{#language:{self.language}}}}}') return f('! data-sort-type="number"|{text}\n') def get_filter_for_info(self): return f(""" ?entity {self.get_selector()} ?lang_label. FILTER((LANG(?lang_label)) = '{self.language}').""") class LabelConfig(TextConfig): def get_key(self): return 'L%s' % self.language def get_selector(self): return 'rdfs:label' class DescriptionConfig(TextConfig): def get_key(self): return 'D%s' % self.language def get_selector(self): return 'schema:description' class QueryException(Exception): pass class PropertyStatistics: """ Generate statitics """ GROUP_MAPPING = Enum('GROUP_MAPPING', {'NO_GROUPING': 'None', 'TOTALS': ''}) TEXT_SELECTOR_MAPPING = {'L': 'rdfs:label', 'D': 'schema:description'} def __init__(self, selector_sparql, columns, grouping_property, higher_grouping=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, grouping_threshold=20, property_threshold=0): # noqa """ Set what to work on and other variables here. """ site = pywikibot.Site('en', 'wikipedia') self.repo = site.data_repository() self.columns = columns self.grouping_property = grouping_property self.higher_grouping = higher_grouping self.higher_grouping_type = higher_grouping_type self.selector_sparql = selector_sparql self.stats_for_no_group = stats_for_no_group self.grouping_threshold = grouping_threshold self.property_threshold = property_threshold self.grouping_link = grouping_link self.column_data = {} self.cell_template = 'Integraality cell' @statsd.timer('property_statistics.sparql.groupings') def get_grouping_information(self): """ Get the information for a single grouping. :return: Tuple of two (ordered) dictionaries. """ if self.higher_grouping: query = f(""" SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT *) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}. }} GROUP BY ?grouping ?higher_grouping HAVING (?count >= {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) else: query = f(""" SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . }} GROUP BY ?grouping HAVING (?count >= {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) grouping_counts = collections.OrderedDict() grouping_groupings = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: raise QueryException("No result when querying groupings.") for resultitem in queryresult: qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'') grouping_counts[qid] = int(resultitem.get('count')) if self.higher_grouping: value = resultitem.get('higher_grouping') if value: value = value.replace(u'http://www.wikidata.org/entity/', u'') grouping_groupings[qid] = value return (grouping_counts, grouping_groupings) def get_query_for_items_for_property_positive(self, column, grouping): query = f(""" SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {{ ?entity {self.selector_sparql} .""") if grouping == self.GROUP_MAPPING.TOTALS: pass elif grouping == self.GROUP_MAPPING.NO_GROUPING: query += f(""" MINUS {{ ?entity wdt:{self.grouping_property} [] . }}""") else: query += f(""" ?entity wdt:{self.grouping_property} wd:{grouping} .""") if column.startswith('P'): query += f(""" ?entity p:{column} ?prop . OPTIONAL {{ ?prop ps:{column} ?value }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }} }} """) elif column.startswith('L') or column.startswith('D'): query += f(""" FILTER(EXISTS {{ ?entity {self.TEXT_SELECTOR_MAPPING[column[:1]]} ?lang_label. FILTER((LANG(?lang_label)) = "{column[1:]}"). }}) SERVICE wikibase:label {{ bd:serviceParam wikibase:language "{column[1:]}". }} }} """) return query def get_query_for_items_for_property_negative(self, column, grouping): query = f(""" SELECT DISTINCT ?entity ?entityLabel WHERE {{ ?entity {self.selector_sparql} .""") if grouping == self.GROUP_MAPPING.TOTALS: query += f(""" MINUS {{""") elif grouping == self.GROUP_MAPPING.NO_GROUPING: query += f(""" MINUS {{ {{?entity wdt:{self.grouping_property} [] .}} UNION""") else: query += f(""" ?entity wdt:{self.grouping_property} wd:{grouping} . MINUS {{""") if column.startswith('P'): query += f(""" {{?entity a wdno:{column} .}} UNION {{?entity wdt:{column} ?prop .}} }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }} }} """) elif column.startswith('L') or column.startswith('D'): query += f(""" {{ ?entity {self.TEXT_SELECTOR_MAPPING[column[:1]]} ?lang_label. FILTER((LANG(?lang_label)) = "{column[1:]}") }} }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }} }} """) return query def get_totals_no_grouping(self): query = f(""" SELECT (COUNT(*) as ?count) WHERE {{ ?entity {self.selector_sparql} MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }} }} """) return self._get_count_from_sparql(query) def get_totals(self): query = f(""" SELECT (COUNT(*) as ?count) WHERE {{ ?entity {self.selector_sparql} }} """) return self._get_count_from_sparql(query) @staticmethod @statsd.timer('property_statistics.sparql.count') def _get_count_from_sparql(query): sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None return int(queryresult[0].get('count')) @staticmethod @statsd.timer('property_statistics.sparql.grouping_counts') def _get_grouping_counts_from_sparql(query): result = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None for resultitem in queryresult: qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'') result[qid] = int(resultitem.get('count')) return result @staticmethod def _get_percentage(count, total): if not count: return 0 return round(1.0 * count / max(total, 1) * 100, 2) def get_header(self): text = u'{| class="wikitable sortable"\n' colspan = 3 if self.higher_grouping else 2 text += f('! colspan="{colspan}" |Top groupings (Minimum {self.grouping_threshold} items)\n') text += f('! colspan="{len(self.columns)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n') # noqa text += u'|-\n' if self.higher_grouping: text += u'! \n' text += u'! Name\n' text += u'! Count\n' for column_entry in self.columns: text += column_entry.make_column_header() return text def format_higher_grouping_text(self, higher_grouping_value): type_mapping = { "country": "{{Flag|%s}}" % higher_grouping_value, } if re.match(r"Q\d+", higher_grouping_value): higher_grouping_text = f('{{{{Q|{higher_grouping_value}}}}}') elif re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value): match = re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value) image_name = match.groups()[0] higher_grouping_text = f('[[File:{image_name}|center|100px]]') higher_grouping_value = image_name elif self.higher_grouping_type in type_mapping: higher_grouping_text = type_mapping.get(self.higher_grouping_type) else: higher_grouping_text = higher_grouping_value return f('| data-sort-value="{higher_grouping_value}"| {higher_grouping_text}\n') def make_stats_for_no_group(self): """ Query the data for no_group, return the wikitext """ text = u'|-\n' if self.higher_grouping: text += u'|\n' total_no_count = self.get_totals_no_grouping() text += u'| No grouping \n' text += f('| {total_no_count} \n') for column_entry in self.columns: column_count = self._get_count_from_sparql(column_entry.get_info_no_grouping_query(self)) percentage = self._get_percentage(column_count, total_no_count) text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|column={column_entry.get_title()}|grouping={self.GROUP_MAPPING.NO_GROUPING.value}}}}}\n') # noqa return text def make_stats_for_one_grouping(self, grouping, item_count, higher_grouping): """ Query the data for one group, return the wikitext. """ text = u'|-\n' if self.higher_grouping: if higher_grouping: text += self.format_higher_grouping_text(higher_grouping) else: text += u'|\n' text += u'| {{Q|%s}}\n' % (grouping,) if self.grouping_link: try: group_item = pywikibot.ItemPage(self.repo, grouping) group_item.get() label = group_item.labels["en"] except (pywikibot.exceptions.InvalidTitle, KeyError): logging.info(f("Could not retrieve label for {grouping}")) label = grouping text += f('| [[{self.grouping_link}/{label}|{item_count}]] \n') else: text += f('| {item_count} \n') for column_entry in self.columns: column_entry_key = column_entry.get_key() try: column_count = self.column_data.get(column_entry_key).get(grouping) except AttributeError: column_count = 0 if not column_count: column_count = 0 percentage = self._get_percentage(column_count, item_count) text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|column={column_entry.get_title()}|grouping={grouping}}}}}\n') # noqa return text def make_footer(self): total_items = self.get_totals() text = u'|- class="sortbottom"\n|' if self.higher_grouping: text += u"|\n|" text += f('\'\'\'Totals\'\'\' (all items):\n| {total_items}\n') for column_entry in self.columns: totalprop = self._get_count_from_sparql(column_entry.get_totals_query(self)) percentage = self._get_percentage(totalprop, total_items) text += f('| {{{{{self.cell_template}|{percentage}|{totalprop}|column={column_entry.get_title()}}}}}\n') text += u'|}\n' return text @statsd.timer('property_statistics.processing') def retrieve_and_process_data(self): """ Query the data, output wikitext """ logging.info("Retrieving grouping information...") try: (groupings_counts, groupings_groupings) = self.get_grouping_information() except QueryException as e: logging.error(f('No groupings found.')) raise e logging.info(f('Grouping retrieved: {len(groupings_counts)}')) for column_entry in self.columns: column_entry_key = column_entry.get_key() self.column_data[column_entry_key] = self._get_grouping_counts_from_sparql(column_entry.get_info_query(self)) text = self.get_header() for (grouping, item_count) in groupings_counts.items(): higher_grouping = groupings_groupings.get(grouping) text += self.make_stats_for_one_grouping(grouping, item_count, higher_grouping) if self.stats_for_no_group: text += self.make_stats_for_no_group() text += self.make_footer() return text def main(*args): """ Main function. """ columns = [ PropertyConfig('P21'), PropertyConfig('P19'), LabelConfig('de'), DescriptionConfig('de'), ] logging.info("Main function...") stats = PropertyStatistics( columns=columns, selector_sparql=u'wdt:P31 wd:Q41960', grouping_property=u'P551', stats_for_no_group=True, grouping_threshold=5, property_threshold=1, ) print(stats.retrieve_and_process_data()) if __name__ == "__main__": main() diff --git a/integraality/tests/test_pages_processor.py b/integraality/tests/test_pages_processor.py index e48ba9c..771100c 100644 --- a/integraality/tests/test_pages_processor.py +++ b/integraality/tests/test_pages_processor.py @@ -1,285 +1,290 @@ # -*- coding: utf-8 -*- """Unit tests for functions.py.""" import argparse import unittest from unittest.mock import patch import fakeredis from integraality.pages_processor import ConfigException, PagesProcessor, main from integraality.property_statistics import ( DescriptionConfig, LabelConfig, PropertyConfig ) class ProcessortTest(unittest.TestCase): def setUp(self): fake_cache_client = fakeredis.FakeStrictRedis() self.processor = PagesProcessor(cache_client=fake_cache_client) class TestReplaceInPage(ProcessortTest): def setUp(self): self.processor = PagesProcessor() self.text = """ Head {{Property dashboard start |properties=P136:genre,P404 |grouping_property=P400 |stats_for_no_group=1 |selector_sparql=wdt:P31/wdt:P279* wd:Q7889 |target_page_title=Wikidata:WikiProject Video games/Statistics/Platform |grouping_link=Wikidata::WikiProject Video games/Reports/Platform }} foo {{Property dashboard end}} Bottom """ self.final_text = """ Head {{Property dashboard start |properties=P136:genre,P404 |grouping_property=P400 |stats_for_no_group=1 |selector_sparql=wdt:P31/wdt:P279* wd:Q7889 |target_page_title=Wikidata:WikiProject Video games/Statistics/Platform |grouping_link=Wikidata::WikiProject Video games/Reports/Platform }} bar {{Property dashboard end}} Bottom """ def test_replace_in_page(self): result = self.processor.replace_in_page("bar", self.text) self.assertEqual(result, self.final_text) def test_replace_in_page_escaped_pipe(self): text = self.text.replace('wd:Q7889', '{{!}}') final_text = self.final_text.replace('wd:Q7889', '{{!}}') result = self.processor.replace_in_page("bar", text) self.assertEqual(result, final_text) class TestParseConfig(ProcessortTest): def setUp(self): self.processor = PagesProcessor() def test_normal_config(self): input_config = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': '1', 'properties': 'P136:genre,P404', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', } result = self.processor.parse_config(input_config) expected = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': True, 'columns': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889' } self.assertEqual(result, expected) def test_minimal_config(self): input_config = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'properties': 'P136:genre,P404', } result = self.processor.parse_config(input_config) expected = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'columns': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'stats_for_no_group': False, } self.assertEqual(result, expected) def test_full_config(self): input_config = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'grouping_property': 'P400', 'stats_for_no_group': '1', 'properties': 'P136:genre,P404', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_threshold': '1', 'property_threshold': '2', } result = self.processor.parse_config(input_config) expected = { 'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform', 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', 'grouping_property': 'P400', 'columns': [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ], 'stats_for_no_group': True, 'grouping_threshold': '1', 'property_threshold': '2', } self.assertEqual(result, expected) def test_empty_config(self): input_config = {} with(self.assertRaises(ConfigException)): self.processor.parse_config(input_config) def test_insufficient_config(self): input_config = { 'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889', } with(self.assertRaises(ConfigException)): self.processor.parse_config(input_config) class TestParseParams(ProcessortTest): def test_parse_config_from_params_minimal(self): params = ['grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213'] expected = { 'grouping_property': 'P195', 'properties': 'P170:creator,P276', 'selector_sparql': 'wdt:P31 wd:Q3305213' } result = self.processor.parse_config_from_params(params) self.assertEqual(result, expected) def test_parse_config_from_params_with_empty_param(self): params = ['', 'grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213'] expected = { 'grouping_property': 'P195', 'properties': 'P170:creator,P276', 'selector_sparql': 'wdt:P31 wd:Q3305213' } result = self.processor.parse_config_from_params(params) self.assertEqual(result, expected) def test_parse_config_from_params_with_escaped_pipe(self): params = ['grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=REGEX(?id, "^(a{{!}}b)")'] expected = { 'grouping_property': 'P195', 'properties': 'P170:creator,P276', 'selector_sparql': 'REGEX(?id, "^(a|b)")' } result = self.processor.parse_config_from_params(params) self.assertEqual(result, expected) class TestParseConfigProperties(ProcessortTest): def test(self): properties = 'P136:genre,P404' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ] self.assertEqual(result, expected) def test_with_trail_comma(self): properties = 'P136:genre,P404,' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), ] self.assertEqual(result, expected) def test_more_properties(self): properties = 'P136,P178,P123,P495,P577,P404,P437' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136'), PropertyConfig(property='P178'), PropertyConfig(property='P123'), PropertyConfig(property='P495'), PropertyConfig(property='P577'), PropertyConfig(property='P404'), PropertyConfig(property='P437'), ] self.assertEqual(result, expected) def test_with_qualifier(self): properties = 'P136:genre,P404,P669/P670' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), PropertyConfig(property='P669', qualifier='P670'), ] self.assertEqual(result, expected) def test_with_qualifier_and_value(self): properties = 'P136:genre,P404,P553/Q17459/P670' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), PropertyConfig(property='P404'), PropertyConfig(property='P553', value='Q17459', qualifier='P670') ] self.assertEqual(result, expected) def test_with_label(self): properties = 'P136:genre,Lbr,P553' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), LabelConfig(language='br'), PropertyConfig(property='P553') ] self.assertEqual(result, expected) def test_with_description(self): properties = 'P136:genre,Lxy,P553' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P136', title='genre'), DescriptionConfig(language='xy'), PropertyConfig(property='P553') ] self.assertEqual(result, expected) def test_with_space(self): properties = 'P131, P17' result = self.processor.parse_config_properties(properties) expected = [ PropertyConfig(property='P131'), PropertyConfig(property='P17') ] self.assertEqual(result, expected) + def test_with_incorrect_syntax(self): + properties = 'P131,Something' + with self.assertRaises(ConfigException): + result = self.processor.parse_config_properties(properties) + class TestMain(unittest.TestCase): def setUp(self): patcher1 = patch('integraality.pages_processor.PagesProcessor', autospec=True) self.mock_pages_processor = patcher1.start() self.addCleanup(patcher1.stop) patcher2 = patch('argparse.ArgumentParser.parse_args', autospec=True) self.mock_args = patcher2.start() self.addCleanup(patcher2.stop) def test_main_url_argument(self): url = 'Foo' self.mock_args.return_value = argparse.Namespace(url=url) main() self.mock_pages_processor.assert_called_once_with(url) self.mock_pages_processor.return_value.process_all.assert_called_once_with() diff --git a/integraality/tests/test_property_statistics.py b/integraality/tests/test_property_statistics.py index 144d683..86da426 100644 --- a/integraality/tests/test_property_statistics.py +++ b/integraality/tests/test_property_statistics.py @@ -1,950 +1,955 @@ # -*- coding: utf-8 -*- """Unit tests for functions.py.""" import unittest from collections import OrderedDict from unittest.mock import patch from property_statistics import ( ColumnConfigMaker, + ColumnSyntaxException, DescriptionConfig, LabelConfig, PropertyConfig, PropertyStatistics, QueryException ) class PropertyStatisticsTest(unittest.TestCase): def setUp(self): columns = [ PropertyConfig(property='P21'), PropertyConfig(property='P19'), PropertyConfig(property='P1', qualifier='P2'), PropertyConfig(property='P3', value='Q4', qualifier='P5'), LabelConfig(language='br'), DescriptionConfig(language='xy'), ] self.stats = PropertyStatistics( columns=columns, selector_sparql=u'wdt:P31 wd:Q41960', grouping_property=u'P551', property_threshold=10 ) class TestPropertyConfig(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig('P19') def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|{{Property|P19}}\n' self.assertEqual(result, expected) def test_get_totals_query(self): result = self.column.get_totals_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " FILTER(EXISTS {\n" " ?entity p:P19[]\n" " })\n" "}\n" ) self.assertEqual(result, expected) def test_get_info_no_grouping_query(self): result = self.column.get_info_no_grouping_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS {\n" " ?entity p:P19[]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) self.assertEqual(result, expected) def test_get_info_query(self): result = self.column.get_info_query(self.stats) expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER(EXISTS {\n" " ?entity p:P19[]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count >= 10)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) print(result) print(expected) self.assertEqual(result, expected) class TestPropertyConfigWithTitle(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig('P19', title="birth") def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|[[Property:P19|birth]]\n' self.assertEqual(result, expected) class TestPropertyConfigWithQualifier(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig('P669', qualifier='P670') def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|{{Property|P670}}\n' self.assertEqual(result, expected) def test_get_totals_query(self): result = self.column.get_totals_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " FILTER(EXISTS {\n" " ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n" " })\n" "}\n" ) self.assertEqual(result, expected) def test_get_info_no_grouping_query(self): result = self.column.get_info_no_grouping_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS {\n" " ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) self.assertEqual(result, expected) def test_get_info_query(self): result = self.column.get_info_query(self.stats) expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER(EXISTS {\n" " ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count >= 10)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) print(result) print(expected) self.assertEqual(result, expected) class TestPropertyConfigWithQualifierAndLabel(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig('P669', title="street", qualifier='P670') def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|[[Property:P670|street]]\n' self.assertEqual(result, expected) class TestPropertyConfigWithQualifierAndValue(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig(property='P3', value='Q4', qualifier='P5') def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|{{Property|P5}}\n' self.assertEqual(result, expected) def test_get_totals_query(self): result = self.column.get_totals_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " FILTER(EXISTS {\n" " ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n" " })\n" "}\n" ) self.assertEqual(result, expected) def test_get_info_no_grouping_query(self): result = self.column.get_info_no_grouping_query(self.stats) expected = ( "\n" "SELECT (COUNT(*) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS {\n" " ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) print(result) print(expected) self.assertEqual(result, expected) def test_get_info_query(self): result = self.column.get_info_query(self.stats) expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER(EXISTS {\n" " ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n" " })\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count >= 10)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) print(result) print(expected) self.assertEqual(result, expected) class TestPropertyConfigWithQualifierAndValueAndTitle(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = PropertyConfig(property='P3', title="Some property", value='Q4', qualifier='P5') def test_make_column_header(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|[[Property:P5|Some property]]\n' self.assertEqual(result, expected) class TestColumnConfigMaker(unittest.TestCase): def test_property_without_title(self): result = ColumnConfigMaker.make('P136', None) expected = PropertyConfig(property='P136') self.assertEqual(result, expected) def test_property_with_title(self): result = ColumnConfigMaker.make('P136', 'genre') expected = PropertyConfig(property='P136', title='genre') self.assertEqual(result, expected) def test_property_with_qualifier(self): key = 'P669/P670' result = ColumnConfigMaker.make(key, None) expected = PropertyConfig(property='P669', qualifier='P670') self.assertEqual(result, expected) def test_property_with_qualifier_and_title(self): key = 'P669/P670' result = ColumnConfigMaker.make(key, 'street number') expected = PropertyConfig(property='P669', qualifier='P670', title="street number") self.assertEqual(result, expected) def test_property_with_qualifier_and_value(self): key = 'P553/Q17459/P670' result = ColumnConfigMaker.make(key, None) expected = PropertyConfig(property='P553', value='Q17459', qualifier='P670') self.assertEqual(result, expected) def test_property_with_qualifier_and_value_and_title(self): key = 'P553/Q17459/P670' result = ColumnConfigMaker.make(key, 'street number') expected = PropertyConfig(property='P553', value='Q17459', qualifier='P670', title='street number') self.assertEqual(result, expected) def test_label(self): result = ColumnConfigMaker.make('Lxy', None) expected = LabelConfig(language='xy') self.assertEqual(result, expected) def test_description(self): result = ColumnConfigMaker.make('Dxy', None) expected = DescriptionConfig(language='xy') self.assertEqual(result, expected) + def test_unknown_syntax(self): + with self.assertRaises(ColumnSyntaxException): + ColumnConfigMaker.make('SomethingSomething', None) + class SparqlQueryTest(unittest.TestCase): def setUp(self): super().setUp() patcher = patch('pywikibot.data.sparql.SparqlQuery', autospec=True) self.mock_sparql_query = patcher.start() self.addCleanup(patcher.stop) def assert_query_called(self, query): self.mock_sparql_query.return_value.select.assert_called_once_with(query) class TestLabelConfig(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = LabelConfig('br') def test_simple(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|{{#language:br}}\n' self.assertEqual(result, expected) def test_get_key(self): result = self.column.get_key() self.assertEqual(result, 'Lbr') def test_get_totals_query(self): result = self.column.get_totals_query(self.stats) query = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " FILTER(EXISTS {\n" " ?entity rdfs:label ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" ) self.assertEqual(result, query) def test_get_info_query(self): result = self.column.get_info_query(self.stats) query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER(EXISTS {\n" " ?entity rdfs:label ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count >= 10)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, query) def test_get_info_no_grouping_query(self): result = self.column.get_info_no_grouping_query(self.stats) query = ( "\n" "SELECT (COUNT(*) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS {\n" " ?entity rdfs:label ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) print(result) print(query) self.assertEqual(result, query) class TestDescriptionConfig(PropertyStatisticsTest): def setUp(self): super().setUp() self.column = DescriptionConfig('br') def test_simple(self): result = self.column.make_column_header() expected = u'! data-sort-type="number"|{{#language:br}}\n' self.assertEqual(result, expected) def test_get_key(self): result = self.column.get_key() self.assertEqual(result, 'Dbr') def test_get_totals_query(self): result = self.column.get_totals_query(self.stats) query = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " FILTER(EXISTS {\n" " ?entity schema:description ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" ) self.assertEqual(result, query) def test_get_info_query(self): result = self.column.get_info_query(self.stats) query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER(EXISTS {\n" " ?entity schema:description ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count >= 10)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, query) def test_get_info_no_grouping_query(self): result = self.column.get_info_no_grouping_query(self.stats) query = ( "\n" "SELECT (COUNT(*) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS {\n" " ?entity schema:description ?lang_label.\n" " FILTER((LANG(?lang_label)) = 'br').\n" " })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) self.assertEqual(result, query) class FormatHigherGroupingTextTest(SparqlQueryTest, PropertyStatisticsTest): def test_format_higher_grouping_text_default_qitem(self): result = self.stats.format_higher_grouping_text("Q1") expected = '| data-sort-value="Q1"| {{Q|Q1}}\n' self.assertEqual(result, expected) def test_format_higher_grouping_text_string(self): result = self.stats.format_higher_grouping_text("foo") expected = '| data-sort-value="foo"| foo\n' self.assertEqual(result, expected) def test_format_higher_grouping_text_country(self): self.stats.higher_grouping_type = "country" result = self.stats.format_higher_grouping_text("AT") expected = '| data-sort-value="AT"| {{Flag|AT}}\n' self.assertEqual(result, expected) def test_format_higher_grouping_text_image(self): text = "http://commons.wikimedia.org/wiki/Special:FilePath/US%20CDC%20logo.svg" result = self.stats.format_higher_grouping_text(text) expected = '| data-sort-value="US%20CDC%20logo.svg"| [[File:US%20CDC%20logo.svg|center|100px]]\n' self.assertEqual(result, expected) class MakeStatsForNoGroupTest(SparqlQueryTest, PropertyStatisticsTest): def setUp(self): super().setUp() patcher1 = patch('property_statistics.PropertyStatistics.get_totals_no_grouping', autospec=True) self.mock_get_totals_no_grouping = patcher1.start() self.addCleanup(patcher1.stop) self.mock_get_totals_no_grouping.return_value = 20 self.mock_sparql_query.return_value.select.side_effect = [ [{'count': '2'}], [{'count': '10'}], [{'count': '15'}], [{'count': '5'}], [{'count': '4'}], [{'count': '8'}], ] def test_make_stats_for_no_group(self): result = self.stats.make_stats_for_no_group() expected = ( "|-\n" "| No grouping \n" "| 20 \n" "| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n" "| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n" "| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n" "| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n" "| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n" "| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n" ) self.assertEqual(result, expected) self.mock_get_totals_no_grouping.assert_called_once_with(self.stats) self.assertEqual(self.mock_sparql_query.call_count, 6) def test_make_stats_for_no_group_with_higher_grouping(self): self.stats.higher_grouping = 'wdt:P17/wdt:P298' result = self.stats.make_stats_for_no_group() expected = ( "|-\n" "|\n" "| No grouping \n" "| 20 \n" "| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n" "| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n" "| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n" "| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n" "| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n" "| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n" ) self.assertEqual(result, expected) self.mock_get_totals_no_grouping.assert_called_once_with(self.stats) self.assertEqual(self.mock_sparql_query.call_count, 6) class MakeStatsForOneGroupingTest(PropertyStatisticsTest): def setUp(self): super().setUp() self.stats.column_data = { 'P21': OrderedDict([('Q3115846', 10), ('Q5087901', 6)]), 'P19': OrderedDict([('Q3115846', 8), ('Q2166574', 5)]), 'P1P2': OrderedDict([('Q3115846', 2), ('Q2166574', 9)]), 'P3Q4P5': OrderedDict([('Q3115846', 7), ('Q2166574', 1)]), 'Lbr': OrderedDict([('Q3115846', 1), ('Q2166574', 2)]), 'Dxy': OrderedDict([('Q3115846', 2), ('Q2166574', 1)]), } def test_make_stats_for_one_grouping(self): result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None) expected = ( '|-\n' '| {{Q|Q3115846}}\n' '| 10 \n' '| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n' '| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n' '| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n' '| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n' ) self.assertEqual(result, expected) def test_make_stats_for_one_grouping_with_higher_grouping(self): self.stats.higher_grouping = "wdt:P17/wdt:P298" result = self.stats.make_stats_for_one_grouping("Q3115846", 10, "Q1") expected = ( '|-\n' '| data-sort-value="Q1"| {{Q|Q1}}\n' '| {{Q|Q3115846}}\n' '| 10 \n' '| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n' '| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n' '| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n' '| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n' ) self.assertEqual(result, expected) @patch('pywikibot.ItemPage', autospec=True) def test_make_stats_for_one_grouping_with_grouping_link(self, mock_item_page): mock_item_page.return_value.labels = {'en': 'Bar'} self.stats.grouping_link = "Foo" result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None) expected = ( '|-\n' '| {{Q|Q3115846}}\n' '| [[Foo/Bar|10]] \n' '| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n' '| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n' '| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n' '| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n' '| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n' ) self.assertEqual(result, expected) class GetQueryForItemsForPropertyPositive(PropertyStatisticsTest): def test_get_query_for_items_for_property_positive(self): result = self.stats.get_query_for_items_for_property_positive('P21', 'Q3115846') expected = """ SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE { ?entity wdt:P31 wd:Q41960 . ?entity wdt:P551 wd:Q3115846 . ?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_positive_no_grouping(self): result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.NO_GROUPING) expected = """ SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE { ?entity wdt:P31 wd:Q41960 . MINUS { ?entity wdt:P551 [] . } ?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_positive_totals(self): result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.TOTALS) expected = """ SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE { ?entity wdt:P31 wd:Q41960 . ?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_positive_label(self): result = self.stats.get_query_for_items_for_property_positive('Lbr', 'Q3115846') expected = """ SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE { ?entity wdt:P31 wd:Q41960 . ?entity wdt:P551 wd:Q3115846 . FILTER(EXISTS { ?entity rdfs:label ?lang_label. FILTER((LANG(?lang_label)) = "br"). }) SERVICE wikibase:label { bd:serviceParam wikibase:language "br". } } """ self.assertEqual(result, expected) class GetQueryForItemsForPropertyNegative(PropertyStatisticsTest): def test_get_query_for_items_for_property_negative(self): result = self.stats.get_query_for_items_for_property_negative('P21', 'Q3115846') expected = """ SELECT DISTINCT ?entity ?entityLabel WHERE { ?entity wdt:P31 wd:Q41960 . ?entity wdt:P551 wd:Q3115846 . MINUS { {?entity a wdno:P21 .} UNION {?entity wdt:P21 ?prop .} } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_negative_no_grouping(self): result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.NO_GROUPING) expected = """ SELECT DISTINCT ?entity ?entityLabel WHERE { ?entity wdt:P31 wd:Q41960 . MINUS { {?entity wdt:P551 [] .} UNION {?entity a wdno:P21 .} UNION {?entity wdt:P21 ?prop .} } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_negative_totals(self): result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.TOTALS) expected = """ SELECT DISTINCT ?entity ?entityLabel WHERE { ?entity wdt:P31 wd:Q41960 . MINUS { {?entity a wdno:P21 .} UNION {?entity wdt:P21 ?prop .} } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) def test_get_query_for_items_for_property_negative_label(self): result = self.stats.get_query_for_items_for_property_negative('Lbr', 'Q3115846') expected = """ SELECT DISTINCT ?entity ?entityLabel WHERE { ?entity wdt:P31 wd:Q41960 . ?entity wdt:P551 wd:Q3115846 . MINUS { { ?entity rdfs:label ?lang_label. FILTER((LANG(?lang_label)) = "br") } } SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". } } """ self.assertEqual(result, expected) class GetCountFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest): def test_return_count(self): self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}] result = self.stats._get_count_from_sparql("SELECT X") self.assert_query_called("SELECT X") self.assertEqual(result, 18) def test_return_None(self): self.mock_sparql_query.return_value.select.return_value = None result = self.stats._get_count_from_sparql("SELECT X") self.assert_query_called("SELECT X") self.assertEqual(result, None) class GetGroupingCountsFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest): def test_return_count(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q1', 'count': 10}, {'grouping': 'http://www.wikidata.org/entity/Q2', 'count': 5}, ] result = self.stats._get_grouping_counts_from_sparql("SELECT X") self.assert_query_called("SELECT X") expected = OrderedDict([('Q1', 10), ('Q2', 5)]) self.assertEqual(result, expected) def test_return_None(self): self.mock_sparql_query.return_value.select.return_value = None result = self.stats._get_grouping_counts_from_sparql("SELECT X") self.assert_query_called("SELECT X") self.assertEqual(result, None) class SparqlCountTest(SparqlQueryTest, PropertyStatisticsTest): def setUp(self): super().setUp() self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}] def test_get_totals_no_grouping(self): result = self.stats.get_totals_no_grouping() query = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" " MINUS { ?entity wdt:P551 _:b28. }\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_totals(self): result = self.stats.get_totals() query = ( "\n" "SELECT (COUNT(*) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) class GetGroupingInformationTest(SparqlQueryTest, PropertyStatisticsTest): def test_get_grouping_information(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict() ) query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_with_grouping_threshold(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict() ) self.stats.grouping_threshold = 5 query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 5)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_with_higher_grouping(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'higher_grouping': 'NZL', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'higher_grouping': 'USA', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'higher_grouping': 'USA', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict([('Q3115846', 'NZL'), ('Q5087901', 'USA'), ('Q623333', 'USA')]) ) self.stats.higher_grouping = 'wdt:P17/wdt:P298' query = ( "\n" "SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) " "(COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " OPTIONAL { ?grouping wdt:P17/wdt:P298 ?_higher_grouping }.\n" "} GROUP BY ?grouping ?higher_grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_empty_result(self): self.mock_sparql_query.return_value.select.return_value = None query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) with self.assertRaises(QueryException): self.stats.get_grouping_information() self.assert_query_called(query) class TestGetHeader(PropertyStatisticsTest): def setUp(self): super().setUp() self.stats.grouping_threshold = 7 self.stats.property_threshold = 4 def test_get_header(self): result = self.stats.get_header() expected = ( '{| class="wikitable sortable"\n' '! colspan="2" |Top groupings (Minimum 7 items)\n' '! colspan="6"|Top Properties (used at least 4 times per grouping)\n' '|-\n' '! Name\n' '! Count\n' '! data-sort-type="number"|{{Property|P21}}\n' '! data-sort-type="number"|{{Property|P19}}\n' '! data-sort-type="number"|{{Property|P2}}\n' '! data-sort-type="number"|{{Property|P5}}\n' '! data-sort-type="number"|{{#language:br}}\n' '! data-sort-type="number"|{{#language:xy}}\n' ) self.assertEqual(result, expected) def test_get_header_with_higher_grouping(self): self.stats.higher_grouping = 'wdt:P17/wdt:P298' result = self.stats.get_header() expected = ( '{| class="wikitable sortable"\n' '! colspan="3" |Top groupings (Minimum 7 items)\n' '! colspan="6"|Top Properties (used at least 4 times per grouping)\n' '|-\n' '! \n' '! Name\n' '! Count\n' '! data-sort-type="number"|{{Property|P21}}\n' '! data-sort-type="number"|{{Property|P19}}\n' '! data-sort-type="number"|{{Property|P2}}\n' '! data-sort-type="number"|{{Property|P5}}\n' '! data-sort-type="number"|{{#language:br}}\n' '! data-sort-type="number"|{{#language:xy}}\n' ) self.assertEqual(result, expected) class MakeFooterTest(SparqlQueryTest, PropertyStatisticsTest): def setUp(self): super().setUp() self.mock_sparql_query.return_value.select.side_effect = [ [{'count': '120'}], [{'count': '30'}], [{'count': '80'}], [{'count': '10'}], [{'count': '12'}], [{'count': '24'}], [{'count': '36'}], ] def test_make_footer(self): result = self.stats.make_footer() expected = ( '|- class="sortbottom"\n' "|\'\'\'Totals\'\'\' (all items):\n" "| 120\n" "| {{Integraality cell|25.0|30|column=P21}}\n" "| {{Integraality cell|66.67|80|column=P19}}\n" "| {{Integraality cell|8.33|10|column=P1/P2}}\n" "| {{Integraality cell|10.0|12|column=P3/Q4/P5}}\n" "| {{Integraality cell|20.0|24|column=Lbr}}\n" "| {{Integraality cell|30.0|36|column=Dxy}}\n" "|}\n" ) self.assertEqual(result, expected)