diff --git a/integraality/property_statistics.py b/integraality/property_statistics.py index 8e7fd16..51a4ff9 100644 --- a/integraality/property_statistics.py +++ b/integraality/property_statistics.py @@ -1,440 +1,449 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Calculate and generate statistics """ import collections import logging +import re from ww import f import pywikibot import pywikibot.data.sparql class PropertyConfig: def __init__(self, property, title=None, value=None, qualifier=None): self.property = property self.title = title self.value = value self.qualifier = qualifier def __eq__(self, other): return ( self.property == other.property and self.title == other.title and self.value == other.value and self.qualifier == other.qualifier ) def get_key(self): if self.qualifier: if self.value: return self.property + self.value + self.qualifier else: return self.property + self.qualifier else: return self.property class QueryException(Exception): pass class PropertyStatistics: """ Generate statitics """ def __init__(self, selector_sparql, properties, grouping_property, higher_grouping=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, grouping_threshold=20, property_threshold=10): # noqa """ Set what to work on and other variables here. """ site = pywikibot.Site('en', 'wikipedia') self.repo = site.data_repository() self.properties = properties self.grouping_property = grouping_property self.higher_grouping = higher_grouping self.higher_grouping_type = higher_grouping_type self.selector_sparql = selector_sparql self.stats_for_no_group = stats_for_no_group self.grouping_threshold = grouping_threshold self.property_threshold = property_threshold self.grouping_link = grouping_link self.property_data = {} self.cell_template = 'Coloured cell' def get_grouping_information(self): """ Get the information for a single grouping. :return: Tuple of two (ordered) dictionaries. """ if self.higher_grouping: query = f(""" SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT ?entity) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}. }} GROUP BY ?grouping ?higher_grouping HAVING (?count > {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) else: query = f(""" SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . }} GROUP BY ?grouping HAVING (?count > {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) grouping_counts = collections.OrderedDict() grouping_groupings = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: raise QueryException("No result when querying groupings.") for resultitem in queryresult: qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'') grouping_counts[qid] = int(resultitem.get('count')) if self.higher_grouping: value = resultitem.get('higher_grouping') if value: value = value.replace(u'http://www.wikidata.org/entity/', u'') grouping_groupings[qid] = value return (grouping_counts, grouping_groupings) def get_property_info(self, property): """ Get the usage counts for a property for the groupings :param prop: Wikidata Pid of the property :return: (Ordered) dictionary with the counts per grouping """ query = f(""" SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . FILTER EXISTS {{ ?entity p:{property} [] }} . }} GROUP BY ?grouping HAVING (?count > {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) result = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None for resultitem in queryresult: qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'') result[qid] = int(resultitem.get('count')) return result def get_qualifier_info(self, property, qualifier, value="[]"): """ Get the usage counts for a qulifier for the groupings :param property: Wikidata Pid of the property :param qualifier: Wikidata Pid of the qualifier :return: (Ordered) dictionary with the counts per grouping """ query = f(""" SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{ ?entity {self.selector_sparql} . ?entity wdt:{self.grouping_property} ?grouping . FILTER EXISTS {{ ?entity p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} . }} GROUP BY ?grouping HAVING (?count > {self.grouping_threshold}) ORDER BY DESC(?count) LIMIT 1000 """) print(query) result = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None for resultitem in queryresult: qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'') result[qid] = int(resultitem.get('count')) return result def get_property_info_no_grouping(self, property): """ Get the usage counts for a property without a grouping :param property: Wikidata Pid of the property :return: (Ordered) dictionary with the counts per grouping """ query = f(""" SELECT (COUNT(?entity) AS ?count) WHERE {{ ?entity {self.selector_sparql} . MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }} FILTER(EXISTS {{ ?entity p:{property} _:b29. }}) }} GROUP BY ?grouping ORDER BY DESC (?count) LIMIT 10 """) return self._get_count_from_sparql(query) def get_qualifier_info_no_grouping(self, property, qualifier, value='[]'): """ Get the usage counts for a qualifier without a grouping :param property: Wikidata Pid of the property :param qualifier: Wikidata Pid of the qualifier :return: (Ordered) dictionary with the counts per grouping """ query = f(""" SELECT (COUNT(?entity) AS ?count) WHERE {{ ?entity {self.selector_sparql} . MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }} FILTER EXISTS {{ ?entity p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} . }} GROUP BY ?grouping ORDER BY DESC (?count) LIMIT 10 """) return self._get_count_from_sparql(query) def get_totals_for_property(self, property): """ Get the totals of entities with that property :param prop: Wikidata Pid of the property. :return: number of games found """ query = f(""" SELECT (COUNT(?item) as ?count) WHERE {{ ?item {self.selector_sparql} FILTER EXISTS {{ ?item p:{property}[] }} . }} """) return self._get_count_from_sparql(query) def get_totals_for_qualifier(self, property, qualifier, value="[]"): """ Get the totals of entities with that property :param prop: Wikidata Pid of the property. :return: number of games found """ query = f(""" SELECT (COUNT(?item) as ?count) WHERE {{ ?item {self.selector_sparql} FILTER EXISTS {{ ?item p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} . }} """) return self._get_count_from_sparql(query) def get_totals_no_grouping(self): query = f(""" SELECT (COUNT(?item) as ?count) WHERE {{ ?item {self.selector_sparql} MINUS {{ ?item wdt:{self.grouping_property} _:b28. }} }} """) return self._get_count_from_sparql(query) def get_totals(self): query = f(""" SELECT (COUNT(?item) as ?count) WHERE {{ ?item {self.selector_sparql} }} """) return self._get_count_from_sparql(query) @staticmethod def _get_count_from_sparql(query): sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None return int(queryresult[0].get('count')) @staticmethod def _get_percentage(count, total): if not count: return 0 return round(1.0 * count / max(total, 1) * 100, 2) @staticmethod def make_column_header(prop_entry): if prop_entry.qualifier: property_link = prop_entry.qualifier else: property_link = prop_entry.property if prop_entry.title: label = f('[[Property:{property_link}|{prop_entry.title}]]') else: label = f('{{{{Property|{property_link}}}}}') return f('! data-sort-type="number"|{label}\n') def get_header(self): text = u'{| class="wikitable sortable"\n' colspan = 3 if self.higher_grouping else 2 text += f('! colspan="{colspan}" |Top groupings (Minimum {self.grouping_threshold} items)\n') text += f('! colspan="{len(self.properties)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n') # noqa text += u'|-\n' if self.higher_grouping: text += u'! \n' text += u'! Name\n' text += u'! Count\n' for prop_entry in self.properties: text += self.make_column_header(prop_entry) return text def format_higher_grouping_text(self, higher_grouping_value): type_mapping = { "country": "{{Flag|%s}}" % higher_grouping_value, - "string": "%s" % higher_grouping_value, } - default = f('{{{{Q|{higher_grouping_value}}}}}') - higher_grouping_text = type_mapping.get(self.higher_grouping_type, default) + if re.match(r"Q\d+", higher_grouping_value): + higher_grouping_text = f('{{{{Q|{higher_grouping_value}}}}}') + elif re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value): + match = re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value) + image_name = match.groups()[0] + higher_grouping_text = f('[[File:{image_name}|center|100px]]') + higher_grouping_value = image_name + elif self.higher_grouping_type in type_mapping: + higher_grouping_text = type_mapping.get(self.higher_grouping_type) + else: + higher_grouping_text = higher_grouping_value return f('| data-sort-value="{higher_grouping_value}"| {higher_grouping_text}\n') def make_stats_for_no_group(self): """ Query the data for no_group, return the wikitext """ text = u'|-\n' if self.higher_grouping: text += u'|\n' total_no_count = self.get_totals_no_grouping() text += u'| No grouping \n' text += f('| {total_no_count} \n') for prop_entry in self.properties: property_name = prop_entry.property if prop_entry.qualifier: propcount = self.get_qualifier_info_no_grouping(property_name, prop_entry.qualifier) else: propcount = self.get_property_info_no_grouping(property_name) percentage = self._get_percentage(propcount, total_no_count) text += f('| {{{{{self.cell_template}|{percentage}|{propcount}}}}}\n') return text def make_stats_for_one_grouping(self, grouping, item_count, higher_grouping): """ Query the data for one group, return the wikitext. """ text = u'|-\n' if self.higher_grouping: if higher_grouping: text += self.format_higher_grouping_text(higher_grouping) else: text += u'|\n' text += u'| {{Q|%s}}\n' % (grouping,) if self.grouping_link: group_item = pywikibot.ItemPage(self.repo, grouping) group_item.get() label = group_item.labels["en"] text += f('| [[{self.grouping_link}/{label}|{item_count}]] \n') else: text += f('| {item_count} \n') for prop_entry in self.properties: prop_entry_key = prop_entry.get_key() try: propcount = self.property_data.get(prop_entry_key).get(grouping) except AttributeError: propcount = 0 if not propcount: propcount = 0 percentage = self._get_percentage(propcount, item_count) text += f('| {{{{{self.cell_template}|{percentage}|{propcount}}}}}\n') return text def retrieve_and_process_data(self): """ Query the data, output wikitext """ logging.info("Retrieving grouping information...") try: (groupings_counts, groupings_groupings) = self.get_grouping_information() except QueryException as e: logging.error(f('No groupings found.')) raise e logging.info(f('Grouping retrieved: {len(groupings_counts)}')) for prop_entry in self.properties: property_name = prop_entry.property prop_entry_key = prop_entry.get_key() if prop_entry.qualifier: self.property_data[prop_entry_key] = self.get_qualifier_info(property_name, prop_entry.qualifier) else: self.property_data[prop_entry_key] = self.get_property_info(property_name) text = self.get_header() for (grouping, item_count) in groupings_counts.items(): higher_grouping = groupings_groupings.get(grouping) text += self.make_stats_for_one_grouping(grouping, item_count, higher_grouping) if self.stats_for_no_group: text += self.make_stats_for_no_group() # Get the totals total_items = self.get_totals() text += u'|- class="sortbottom"\n|' if self.higher_grouping: text += u"|\n|" text += f('\'\'\'Totals\'\'\' (all items):\n| {total_items}\n') for prop_entry in self.properties: property_name = prop_entry.property if prop_entry.qualifier: totalprop = self.get_totals_for_qualifier(property=property_name, qualifier=prop_entry.qualifier) else: totalprop = self.get_totals_for_property(property=property_name) percentage = self._get_percentage(totalprop, total_items) text += f('| {{{{{self.cell_template}|{percentage}|{totalprop}}}}}\n') text += u'|}\n' return text def main(*args): """ Main function. """ properties = [ PropertyConfig('P21'), PropertyConfig('P19'), ] logging.info("Main function...") stats = PropertyStatistics( properties=properties, selector_sparql=u'wdt:P31 wd:Q41960', grouping_property=u'P551', stats_for_no_group=True, grouping_threshold=5 ) print(stats.retrieve_and_process_data()) if __name__ == "__main__": main() diff --git a/integraality/tests/test_property_statistics.py b/integraality/tests/test_property_statistics.py index 4c8dd90..f39a190 100644 --- a/integraality/tests/test_property_statistics.py +++ b/integraality/tests/test_property_statistics.py @@ -1,439 +1,444 @@ # -*- coding: utf-8 -*- """Unit tests for functions.py.""" import unittest from collections import OrderedDict from unittest.mock import call, patch from property_statistics import ( PropertyConfig, PropertyStatistics, QueryException ) class PropertyStatisticsTest(unittest.TestCase): def setUp(self): properties = [ PropertyConfig(property='P21'), PropertyConfig(property='P19'), ] self.stats = PropertyStatistics( properties=properties, selector_sparql=u'wdt:P31 wd:Q41960', grouping_property=u'P551', ) class TestMakeColumnHeader(PropertyStatisticsTest): def test_simple(self): prop_entry = PropertyConfig('P19') result = self.stats.make_column_header(prop_entry) expected = u'! data-sort-type="number"|{{Property|P19}}\n' self.assertEqual(result, expected) def test_with_label(self): prop_entry = PropertyConfig('P19', title="birth") result = self.stats.make_column_header(prop_entry) expected = u'! data-sort-type="number"|[[Property:P19|birth]]\n' self.assertEqual(result, expected) def test_with_qualifier(self): prop_entry = PropertyConfig('P669', qualifier='P670') result = self.stats.make_column_header(prop_entry) expected = u'! data-sort-type="number"|{{Property|P670}}\n' self.assertEqual(result, expected) def test_with_qualifier_and_label(self): prop_entry = PropertyConfig('P669', title="street", qualifier='P670') result = self.stats.make_column_header(prop_entry) expected = u'! data-sort-type="number"|[[Property:P670|street]]\n' self.assertEqual(result, expected) class FormatHigherGroupingTextTest(PropertyStatisticsTest): def test_format_higher_grouping_text_default_qitem(self): result = self.stats.format_higher_grouping_text("Q1") expected = '| data-sort-value="Q1"| {{Q|Q1}}\n' self.assertEqual(result, expected) def test_format_higher_grouping_text_string(self): - self.stats.higher_grouping_type = "string" result = self.stats.format_higher_grouping_text("foo") expected = '| data-sort-value="foo"| foo\n' self.assertEqual(result, expected) def test_format_higher_grouping_text_country(self): self.stats.higher_grouping_type = "country" result = self.stats.format_higher_grouping_text("AT") expected = '| data-sort-value="AT"| {{Flag|AT}}\n' self.assertEqual(result, expected) + def test_format_higher_grouping_text_image(self): + text = "http://commons.wikimedia.org/wiki/Special:FilePath/US%20CDC%20logo.svg" + result = self.stats.format_higher_grouping_text(text) + expected = '| data-sort-value="US%20CDC%20logo.svg"| [[File:US%20CDC%20logo.svg|center|100px]]\n' + self.assertEqual(result, expected) + class MakeStatsForNoGroupTest(PropertyStatisticsTest): def setUp(self): super().setUp() patcher1 = patch('property_statistics.PropertyStatistics.get_totals_no_grouping', autospec=True) patcher2 = patch('property_statistics.PropertyStatistics.get_property_info_no_grouping', autospec=True) self.mock_get_totals_no_grouping = patcher1.start() self.mock_get_property_info_no_grouping = patcher2.start() self.addCleanup(patcher1.stop) self.addCleanup(patcher2.stop) def test_make_stats_for_no_group(self): self.mock_get_totals_no_grouping.return_value = 20 self.mock_get_property_info_no_grouping.side_effect = [2, 10] result = self.stats.make_stats_for_no_group() expected = "|-\n| No grouping \n| 20 \n| {{Coloured cell|10.0|2}}\n| {{Coloured cell|50.0|10}}\n" self.assertEqual(result, expected) self.mock_get_totals_no_grouping.assert_called_once_with(self.stats) self.mock_get_property_info_no_grouping.assert_has_calls([ call(self.stats, "P21"), call(self.stats, "P19"), ]) def test_make_stats_for_no_group_with_higher_grouping(self): self.mock_get_totals_no_grouping.return_value = 20 self.mock_get_property_info_no_grouping.side_effect = [2, 10] self.stats.higher_grouping = 'wdt:P17/wdt:P298' result = self.stats.make_stats_for_no_group() expected = "|-\n|\n| No grouping \n| 20 \n| {{Coloured cell|10.0|2}}\n| {{Coloured cell|50.0|10}}\n" self.assertEqual(result, expected) self.mock_get_totals_no_grouping.assert_called_once_with(self.stats) self.mock_get_property_info_no_grouping.assert_has_calls([ call(self.stats, "P21"), call(self.stats, "P19"), ]) class MakeStatsForOneGroupingTest(PropertyStatisticsTest): def setUp(self): super().setUp() self.stats.property_data = { 'P21': OrderedDict([('Q3115846', 10), ('Q5087901', 6)]), 'P19': OrderedDict([('Q3115846', 8), ('Q2166574', 5)]), } def test_make_stats_for_one_grouping(self): result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None) expected = ( '|-\n' '| {{Q|Q3115846}}\n' '| 10 \n' '| {{Coloured cell|100.0|10}}\n' '| {{Coloured cell|80.0|8}}\n' ) self.assertEqual(result, expected) def test_make_stats_for_one_grouping_with_higher_grouping(self): self.stats.higher_grouping = "wdt:P17/wdt:P298" result = self.stats.make_stats_for_one_grouping("Q3115846", 10, "Q1") expected = ( '|-\n' '| data-sort-value="Q1"| {{Q|Q1}}\n' '| {{Q|Q3115846}}\n' '| 10 \n' '| {{Coloured cell|100.0|10}}\n' '| {{Coloured cell|80.0|8}}\n' ) self.assertEqual(result, expected) @patch('pywikibot.ItemPage', autospec=True) def test_make_stats_for_one_grouping_with_grouping_link(self, mock_item_page): mock_item_page.return_value.labels = {'en': 'Bar'} self.stats.grouping_link = "Foo" result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None) expected = ( '|-\n' '| {{Q|Q3115846}}\n' '| [[Foo/Bar|10]] \n' '| {{Coloured cell|100.0|10}}\n' '| {{Coloured cell|80.0|8}}\n' ) self.assertEqual(result, expected) class SparqlQueryTest(PropertyStatisticsTest): def setUp(self): super().setUp() patcher = patch('pywikibot.data.sparql.SparqlQuery', autospec=True) self.mock_sparql_query = patcher.start() self.addCleanup(patcher.stop) def assert_query_called(self, query): self.mock_sparql_query.return_value.select.assert_called_once_with(query) class GetCountFromSparqlTest(SparqlQueryTest): def test_return_count(self): self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}] result = self.stats._get_count_from_sparql("SELECT X") self.assert_query_called("SELECT X") self.assertEqual(result, 18) def test_return_None(self): self.mock_sparql_query.return_value.select.return_value = None result = self.stats._get_count_from_sparql("SELECT X") self.assert_query_called("SELECT X") self.assertEqual(result, None) class SparqlCountTest(SparqlQueryTest): def setUp(self): super().setUp() self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}] def test_get_property_info_no_grouping(self): result = self.stats.get_property_info_no_grouping('P1') query = ( "\n" "SELECT (COUNT(?entity) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER(EXISTS { ?entity p:P1 _:b29. })\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_qualifier_info_no_grouping(self): result = self.stats.get_qualifier_info_no_grouping('P1', 'P2') query = ( "\n" "SELECT (COUNT(?entity) AS ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " MINUS { ?entity wdt:P551 _:b28. }\n" " FILTER EXISTS { ?entity p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n" "}\n" "GROUP BY ?grouping\n" "ORDER BY DESC (?count)\n" "LIMIT 10\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_totals_for_property(self): result = self.stats.get_totals_for_property('P1') query = ( "\n" "SELECT (COUNT(?item) as ?count) WHERE {\n" " ?item wdt:P31 wd:Q41960\n" " FILTER EXISTS { ?item p:P1[] } .\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_totals_for_qualifier(self): result = self.stats.get_totals_for_qualifier("P1", "P2") query = ( "\n" "SELECT (COUNT(?item) as ?count) WHERE {\n" " ?item wdt:P31 wd:Q41960\n" " FILTER EXISTS { ?item p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_totals_no_grouping(self): result = self.stats.get_totals_no_grouping() query = ( "\n" "SELECT (COUNT(?item) as ?count) WHERE {\n" " ?item wdt:P31 wd:Q41960\n" " MINUS { ?item wdt:P551 _:b28. }\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) def test_get_totals(self): result = self.stats.get_totals() query = ( "\n" "SELECT (COUNT(?item) as ?count) WHERE {\n" " ?item wdt:P31 wd:Q41960\n" "}\n" ) self.assert_query_called(query) self.assertEqual(result, 18) class GetGroupingInformationTest(SparqlQueryTest): def test_get_grouping_information(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict() ) query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_with_grouping_threshold(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict() ) self.stats.grouping_threshold = 5 query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count > 5)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_with_higher_grouping(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'higher_grouping': 'NZL', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'higher_grouping': 'USA', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'higher_grouping': 'USA', 'count': '6'} ] expected = ( OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]), OrderedDict([('Q3115846', 'NZL'), ('Q5087901', 'USA'), ('Q623333', 'USA')]) ) self.stats.higher_grouping = 'wdt:P17/wdt:P298' query = ( "\n" "SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) " "(COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " OPTIONAL { ?grouping wdt:P17/wdt:P298 ?_higher_grouping }.\n" "} GROUP BY ?grouping ?higher_grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) result = self.stats.get_grouping_information() self.assert_query_called(query) self.assertEqual(result, expected) def test_get_grouping_information_empty_result(self): self.mock_sparql_query.return_value.select.return_value = None query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) with self.assertRaises(QueryException): self.stats.get_grouping_information() self.assert_query_called(query) class GetPropertyInfoTest(SparqlQueryTest): def test_get_property_info(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]) result = self.stats.get_property_info('P1') query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER EXISTS { ?entity p:P1 [] } .\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assert_query_called(query) self.assertEqual(result, expected) def test_get_property_info_empty_result(self): self.mock_sparql_query.return_value.select.return_value = None expected = None result = self.stats.get_property_info('P1') query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER EXISTS { ?entity p:P1 [] } .\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assert_query_called(query) self.assertEqual(result, expected) class GetQualifierInfoTest(SparqlQueryTest): def test_get_qualifier_info(self): self.mock_sparql_query.return_value.select.return_value = [ {'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'}, {'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'}, {'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'} ] expected = OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]) result = self.stats.get_qualifier_info('P1', qualifier="P2") query = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity wdt:P31 wd:Q41960 .\n" " ?entity wdt:P551 ?grouping .\n" " FILTER EXISTS { ?entity p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n" "}\n" "GROUP BY ?grouping\n" "HAVING (?count > 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assert_query_called(query) self.assertEqual(result, expected)