diff --git a/integraality/property_statistics.py b/integraality/property_statistics.py
index 6018a1a..cf88d76 100644
--- a/integraality/property_statistics.py
+++ b/integraality/property_statistics.py
@@ -1,504 +1,503 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Calculate and generate statistics
"""
import collections
import logging
import re
from enum import Enum
from ww import f
import pywikibot
import pywikibot.data.sparql
class PropertyConfig:
def __init__(self, property, title=None, value=None, qualifier=None):
self.property = property
self.title = title
self.value = value
self.qualifier = qualifier
def __eq__(self, other):
return (
self.property == other.property
and self.title == other.title
and self.value == other.value
and self.qualifier == other.qualifier
)
def get_key(self):
if self.qualifier:
if self.value:
return self.property + self.value + self.qualifier
else:
return self.property + self.qualifier
else:
return self.property
+ def make_column_header(self):
+ if self.qualifier:
+ property_link = self.qualifier
+ else:
+ property_link = self.property
+
+ if self.title:
+ label = f('[[Property:{property_link}|{self.title}]]')
+ else:
+ label = f('{{{{Property|{property_link}}}}}')
+ return f('! data-sort-type="number"|{label}\n')
+
class QueryException(Exception):
pass
class PropertyStatistics:
"""
Generate statitics
"""
GROUP_MAPPING = Enum('GROUP_MAPPING', {'NO_GROUPING': 'None', 'TOTALS': ''})
def __init__(self, selector_sparql, columns, grouping_property, higher_grouping=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, grouping_threshold=20, property_threshold=0): # noqa
"""
Set what to work on and other variables here.
"""
site = pywikibot.Site('en', 'wikipedia')
self.repo = site.data_repository()
self.columns = columns
self.grouping_property = grouping_property
self.higher_grouping = higher_grouping
self.higher_grouping_type = higher_grouping_type
self.selector_sparql = selector_sparql
self.stats_for_no_group = stats_for_no_group
self.grouping_threshold = grouping_threshold
self.property_threshold = property_threshold
self.grouping_link = grouping_link
self.column_data = {}
self.cell_template = 'Integraality cell'
def get_grouping_information(self):
"""
Get the information for a single grouping.
:return: Tuple of two (ordered) dictionaries.
"""
if self.higher_grouping:
query = f("""
SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}.
}} GROUP BY ?grouping ?higher_grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
else:
query = f("""
SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
}} GROUP BY ?grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
grouping_counts = collections.OrderedDict()
grouping_groupings = collections.OrderedDict()
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
raise QueryException("No result when querying groupings.")
for resultitem in queryresult:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
grouping_counts[qid] = int(resultitem.get('count'))
if self.higher_grouping:
value = resultitem.get('higher_grouping')
if value:
value = value.replace(u'http://www.wikidata.org/entity/', u'')
grouping_groupings[qid] = value
return (grouping_counts, grouping_groupings)
def get_query_for_items_for_property_positive(self, property, grouping):
query = f("""
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {{
?entity {self.selector_sparql} .""")
if grouping == self.GROUP_MAPPING.TOTALS:
pass
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f("""
MINUS {{
?entity wdt:{self.grouping_property} [] .
}}""")
else:
query += f("""
?entity wdt:{self.grouping_property} wd:{grouping} .""")
query += f("""
?entity p:{property} ?prop . OPTIONAL {{ ?prop ps:{property} ?value }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
}}
""")
return query
def get_query_for_items_for_property_negative(self, property, grouping):
query = f("""
SELECT DISTINCT ?entity ?entityLabel WHERE {{
?entity {self.selector_sparql} .""")
if grouping == self.GROUP_MAPPING.TOTALS:
query += f("""
MINUS {{""")
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f("""
MINUS {{
{{?entity wdt:{self.grouping_property} [] .}} UNION""")
else:
query += f("""
?entity wdt:{self.grouping_property} wd:{grouping} .
MINUS {{""")
query += f("""
{{?entity a wdno:{property} .}} UNION
{{?entity wdt:{property} ?prop .}}
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
}}
""")
return query
def get_property_info(self, property):
"""
Get the usage counts for a property for the groupings
:param prop: Wikidata Pid of the property
:return: (Ordered) dictionary with the counts per grouping
"""
query = f("""
SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
FILTER EXISTS {{ ?entity p:{property} [] }} .
}}
GROUP BY ?grouping
HAVING (?count >= {self.property_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
return self._get_grouping_counts_from_sparql(query)
def get_qualifier_info(self, property, qualifier, value="[]"):
"""
Get the usage counts for a qulifier for the groupings
:param property: Wikidata Pid of the property
:param qualifier: Wikidata Pid of the qualifier
:return: (Ordered) dictionary with the counts per grouping
"""
query = f("""
SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
FILTER EXISTS {{ ?entity p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} .
}}
GROUP BY ?grouping
HAVING (?count >= {self.property_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
return self._get_grouping_counts_from_sparql(query)
def get_property_info_no_grouping(self, property):
"""
Get the usage counts for a property without a grouping
:param property: Wikidata Pid of the property
:return: (Ordered) dictionary with the counts per grouping
"""
query = f("""
SELECT (COUNT(?entity) AS ?count) WHERE {{
?entity {self.selector_sparql} .
MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }}
FILTER(EXISTS {{ ?entity p:{property} _:b29. }})
}}
GROUP BY ?grouping
ORDER BY DESC (?count)
LIMIT 10
""")
return self._get_count_from_sparql(query)
def get_qualifier_info_no_grouping(self, property, qualifier, value='[]'):
"""
Get the usage counts for a qualifier without a grouping
:param property: Wikidata Pid of the property
:param qualifier: Wikidata Pid of the qualifier
:return: (Ordered) dictionary with the counts per grouping
"""
query = f("""
SELECT (COUNT(?entity) AS ?count) WHERE {{
?entity {self.selector_sparql} .
MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }}
FILTER EXISTS {{ ?entity p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} .
}}
GROUP BY ?grouping
ORDER BY DESC (?count)
LIMIT 10
""")
return self._get_count_from_sparql(query)
def get_totals_for_property(self, property):
"""
Get the totals of entities with that property
:param prop: Wikidata Pid of the property.
:return: number of games found
"""
query = f("""
SELECT (COUNT(?item) as ?count) WHERE {{
?item {self.selector_sparql}
FILTER EXISTS {{ ?item p:{property}[] }} .
}}
""")
return self._get_count_from_sparql(query)
def get_totals_for_qualifier(self, property, qualifier, value="[]"):
"""
Get the totals of entities with that property
:param prop: Wikidata Pid of the property.
:return: number of games found
"""
query = f("""
SELECT (COUNT(?item) as ?count) WHERE {{
?item {self.selector_sparql}
FILTER EXISTS {{ ?item p:{property} [ ps:{property} {value} ; pq:{qualifier} [] ] }} .
}}
""")
return self._get_count_from_sparql(query)
def get_totals_no_grouping(self):
query = f("""
SELECT (COUNT(?item) as ?count) WHERE {{
?item {self.selector_sparql}
MINUS {{ ?item wdt:{self.grouping_property} _:b28. }}
}}
""")
return self._get_count_from_sparql(query)
def get_totals(self):
query = f("""
SELECT (COUNT(?item) as ?count) WHERE {{
?item {self.selector_sparql}
}}
""")
return self._get_count_from_sparql(query)
@staticmethod
def _get_count_from_sparql(query):
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
return None
return int(queryresult[0].get('count'))
@staticmethod
def _get_grouping_counts_from_sparql(query):
result = collections.OrderedDict()
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
return None
for resultitem in queryresult:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
result[qid] = int(resultitem.get('count'))
return result
@staticmethod
def _get_percentage(count, total):
if not count:
return 0
return round(1.0 * count / max(total, 1) * 100, 2)
- @staticmethod
- def make_column_header(column_entry):
- if column_entry.qualifier:
- property_link = column_entry.qualifier
- else:
- property_link = column_entry.property
-
- if column_entry.title:
- label = f('[[Property:{property_link}|{column_entry.title}]]')
- else:
- label = f('{{{{Property|{property_link}}}}}')
- return f('! data-sort-type="number"|{label}\n')
-
def get_header(self):
text = u'{| class="wikitable sortable"\n'
colspan = 3 if self.higher_grouping else 2
text += f('! colspan="{colspan}" |Top groupings (Minimum {self.grouping_threshold} items)\n')
text += f('! colspan="{len(self.columns)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n') # noqa
text += u'|-\n'
if self.higher_grouping:
text += u'! \n'
text += u'! Name\n'
text += u'! Count\n'
for column_entry in self.columns:
- text += self.make_column_header(column_entry)
+ text += column_entry.make_column_header()
return text
def format_higher_grouping_text(self, higher_grouping_value):
type_mapping = {
"country": "{{Flag|%s}}" % higher_grouping_value,
}
if re.match(r"Q\d+", higher_grouping_value):
higher_grouping_text = f('{{{{Q|{higher_grouping_value}}}}}')
elif re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value):
match = re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value)
image_name = match.groups()[0]
higher_grouping_text = f('[[File:{image_name}|center|100px]]')
higher_grouping_value = image_name
elif self.higher_grouping_type in type_mapping:
higher_grouping_text = type_mapping.get(self.higher_grouping_type)
else:
higher_grouping_text = higher_grouping_value
return f('| data-sort-value="{higher_grouping_value}"| {higher_grouping_text}\n')
def make_stats_for_no_group(self):
"""
Query the data for no_group, return the wikitext
"""
text = u'|-\n'
if self.higher_grouping:
text += u'|\n'
total_no_count = self.get_totals_no_grouping()
text += u'| No grouping \n'
text += f('| {total_no_count} \n')
for column_entry in self.columns:
property_name = column_entry.property
if column_entry.qualifier:
value = column_entry.value or '[]'
column_count = self.get_qualifier_info_no_grouping(property_name, column_entry.qualifier, value)
else:
column_count = self.get_property_info_no_grouping(property_name)
percentage = self._get_percentage(column_count, total_no_count)
text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|property={column_entry.property}|grouping={self.GROUP_MAPPING.NO_GROUPING.value}}}}}\n') # noqa
return text
def make_stats_for_one_grouping(self, grouping, item_count, higher_grouping):
"""
Query the data for one group, return the wikitext.
"""
text = u'|-\n'
if self.higher_grouping:
if higher_grouping:
text += self.format_higher_grouping_text(higher_grouping)
else:
text += u'|\n'
text += u'| {{Q|%s}}\n' % (grouping,)
if self.grouping_link:
group_item = pywikibot.ItemPage(self.repo, grouping)
group_item.get()
label = group_item.labels["en"]
text += f('| [[{self.grouping_link}/{label}|{item_count}]] \n')
else:
text += f('| {item_count} \n')
for column_entry in self.columns:
column_entry_key = column_entry.get_key()
try:
column_count = self.column_data.get(column_entry_key).get(grouping)
except AttributeError:
column_count = 0
if not column_count:
column_count = 0
percentage = self._get_percentage(column_count, item_count)
text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|property={column_entry.property}|grouping={grouping}}}}}\n') # noqa
return text
def make_footer(self):
total_items = self.get_totals()
text = u'|- class="sortbottom"\n|'
if self.higher_grouping:
text += u"|\n|"
text += f('\'\'\'Totals\'\'\' (all items):\n| {total_items}\n')
for column_entry in self.columns:
property_name = column_entry.property
if column_entry.qualifier:
totalprop = self.get_totals_for_qualifier(property=property_name, qualifier=column_entry.qualifier)
else:
totalprop = self.get_totals_for_property(property=property_name)
percentage = self._get_percentage(totalprop, total_items)
text += f('| {{{{{self.cell_template}|{percentage}|{totalprop}|property={column_entry.property}}}}}\n')
text += u'|}\n'
return text
def retrieve_and_process_data(self):
"""
Query the data, output wikitext
"""
logging.info("Retrieving grouping information...")
try:
(groupings_counts, groupings_groupings) = self.get_grouping_information()
except QueryException as e:
logging.error(f('No groupings found.'))
raise e
logging.info(f('Grouping retrieved: {len(groupings_counts)}'))
for column_entry in self.columns:
property_name = column_entry.property
column_entry_key = column_entry.get_key()
if column_entry.qualifier:
value = column_entry.value or '[]'
self.column_data[column_entry_key] = self.get_qualifier_info(property_name, column_entry.qualifier, value)
else:
self.column_data[column_entry_key] = self.get_property_info(property_name)
text = self.get_header()
for (grouping, item_count) in groupings_counts.items():
higher_grouping = groupings_groupings.get(grouping)
text += self.make_stats_for_one_grouping(grouping, item_count, higher_grouping)
if self.stats_for_no_group:
text += self.make_stats_for_no_group()
text += self.make_footer()
return text
def main(*args):
"""
Main function.
"""
columns = [
PropertyConfig('P21'),
PropertyConfig('P19'),
]
logging.info("Main function...")
stats = PropertyStatistics(
columns=columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P551',
stats_for_no_group=True,
grouping_threshold=5,
property_threshold=1,
)
print(stats.retrieve_and_process_data())
if __name__ == "__main__":
main()
diff --git a/integraality/tests/test_property_statistics.py b/integraality/tests/test_property_statistics.py
index e429f83..1152bcd 100644
--- a/integraality/tests/test_property_statistics.py
+++ b/integraality/tests/test_property_statistics.py
@@ -1,661 +1,661 @@
# -*- coding: utf-8 -*-
"""Unit tests for functions.py."""
import unittest
from collections import OrderedDict
from unittest.mock import call, patch
from property_statistics import (
PropertyConfig,
PropertyStatistics,
QueryException
)
-class PropertyStatisticsTest(unittest.TestCase):
-
- def setUp(self):
- columns = [
- PropertyConfig(property='P21'),
- PropertyConfig(property='P19'),
- PropertyConfig(property='P1', qualifier='P2'),
- PropertyConfig(property='P3', value='Q4', qualifier='P5'),
- ]
- self.stats = PropertyStatistics(
- columns=columns,
- selector_sparql=u'wdt:P31 wd:Q41960',
- grouping_property=u'P551',
- property_threshold=10
- )
-
-
-class TestMakeColumnHeader(PropertyStatisticsTest):
+class TestPropertyConfig(unittest.TestCase):
def test_simple(self):
prop_entry = PropertyConfig('P19')
- result = self.stats.make_column_header(prop_entry)
+ result = prop_entry.make_column_header()
expected = u'! data-sort-type="number"|{{Property|P19}}\n'
self.assertEqual(result, expected)
def test_with_label(self):
prop_entry = PropertyConfig('P19', title="birth")
- result = self.stats.make_column_header(prop_entry)
+ result = prop_entry.make_column_header()
expected = u'! data-sort-type="number"|[[Property:P19|birth]]\n'
self.assertEqual(result, expected)
def test_with_qualifier(self):
prop_entry = PropertyConfig('P669', qualifier='P670')
- result = self.stats.make_column_header(prop_entry)
+ result = prop_entry.make_column_header()
expected = u'! data-sort-type="number"|{{Property|P670}}\n'
self.assertEqual(result, expected)
def test_with_qualifier_and_label(self):
prop_entry = PropertyConfig('P669', title="street", qualifier='P670')
- result = self.stats.make_column_header(prop_entry)
+ result = prop_entry.make_column_header()
expected = u'! data-sort-type="number"|[[Property:P670|street]]\n'
self.assertEqual(result, expected)
+class PropertyStatisticsTest(unittest.TestCase):
+
+ def setUp(self):
+ columns = [
+ PropertyConfig(property='P21'),
+ PropertyConfig(property='P19'),
+ PropertyConfig(property='P1', qualifier='P2'),
+ PropertyConfig(property='P3', value='Q4', qualifier='P5'),
+ ]
+ self.stats = PropertyStatistics(
+ columns=columns,
+ selector_sparql=u'wdt:P31 wd:Q41960',
+ grouping_property=u'P551',
+ property_threshold=10
+ )
+
+
class FormatHigherGroupingTextTest(PropertyStatisticsTest):
def test_format_higher_grouping_text_default_qitem(self):
result = self.stats.format_higher_grouping_text("Q1")
expected = '| data-sort-value="Q1"| {{Q|Q1}}\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_string(self):
result = self.stats.format_higher_grouping_text("foo")
expected = '| data-sort-value="foo"| foo\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_country(self):
self.stats.higher_grouping_type = "country"
result = self.stats.format_higher_grouping_text("AT")
expected = '| data-sort-value="AT"| {{Flag|AT}}\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_image(self):
text = "http://commons.wikimedia.org/wiki/Special:FilePath/US%20CDC%20logo.svg"
result = self.stats.format_higher_grouping_text(text)
expected = '| data-sort-value="US%20CDC%20logo.svg"| [[File:US%20CDC%20logo.svg|center|100px]]\n'
self.assertEqual(result, expected)
class MakeStatsForNoGroupTest(PropertyStatisticsTest):
def setUp(self):
super().setUp()
patcher1 = patch('property_statistics.PropertyStatistics.get_totals_no_grouping', autospec=True)
patcher2 = patch('property_statistics.PropertyStatistics.get_property_info_no_grouping', autospec=True)
patcher3 = patch('property_statistics.PropertyStatistics.get_qualifier_info_no_grouping', autospec=True)
self.mock_get_totals_no_grouping = patcher1.start()
self.mock_get_property_info_no_grouping = patcher2.start()
self.mock_get_qualifier_info_no_grouping = patcher3.start()
self.addCleanup(patcher1.stop)
self.addCleanup(patcher2.stop)
self.addCleanup(patcher3.stop)
def test_make_stats_for_no_group(self):
self.mock_get_totals_no_grouping.return_value = 20
self.mock_get_property_info_no_grouping.side_effect = [2, 10]
self.mock_get_qualifier_info_no_grouping.side_effect = [15, 5]
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"| No grouping \n"
"| 20 \n"
"| {{Integraality cell|10.0|2|property=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|property=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|property=P1|grouping=None}}\n"
"| {{Integraality cell|25.0|5|property=P3|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.mock_get_property_info_no_grouping.assert_has_calls([
call(self.stats, "P21"),
call(self.stats, "P19"),
])
self.mock_get_qualifier_info_no_grouping.assert_has_calls([
call(self.stats, 'P1', 'P2', '[]'),
call(self.stats, 'P3', 'P5', 'Q4'),
])
def test_make_stats_for_no_group_with_higher_grouping(self):
self.mock_get_totals_no_grouping.return_value = 20
self.mock_get_property_info_no_grouping.side_effect = [2, 10]
self.mock_get_qualifier_info_no_grouping.side_effect = [15, 5]
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"|\n"
"| No grouping \n"
"| 20 \n"
"| {{Integraality cell|10.0|2|property=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|property=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|property=P1|grouping=None}}\n"
"| {{Integraality cell|25.0|5|property=P3|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.mock_get_property_info_no_grouping.assert_has_calls([
call(self.stats, "P21"),
call(self.stats, "P19"),
])
self.mock_get_qualifier_info_no_grouping.assert_has_calls([
call(self.stats, 'P1', 'P2', '[]'),
call(self.stats, 'P3', 'P5', 'Q4'),
])
class MakeStatsForOneGroupingTest(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.stats.column_data = {
'P21': OrderedDict([('Q3115846', 10), ('Q5087901', 6)]),
'P19': OrderedDict([('Q3115846', 8), ('Q2166574', 5)]),
'P1P2': OrderedDict([('Q3115846', 2), ('Q2166574', 9)]),
'P3Q4P5': OrderedDict([('Q3115846', 7), ('Q2166574', 1)]),
}
def test_make_stats_for_one_grouping(self):
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|property=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|property=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|property=P1|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|property=P3|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
def test_make_stats_for_one_grouping_with_higher_grouping(self):
self.stats.higher_grouping = "wdt:P17/wdt:P298"
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, "Q1")
expected = (
'|-\n'
'| data-sort-value="Q1"| {{Q|Q1}}\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|property=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|property=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|property=P1|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|property=P3|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
@patch('pywikibot.ItemPage', autospec=True)
def test_make_stats_for_one_grouping_with_grouping_link(self, mock_item_page):
mock_item_page.return_value.labels = {'en': 'Bar'}
self.stats.grouping_link = "Foo"
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| [[Foo/Bar|10]] \n'
'| {{Integraality cell|100.0|10|property=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|property=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|property=P1|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|property=P3|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyPositive(PropertyStatisticsTest):
def test_get_query_for_items_for_property_positive(self):
result = self.stats.get_query_for_items_for_property_positive('P21', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_no_grouping(self):
result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
?entity wdt:P551 [] .
}
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_totals(self):
result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyNegative(PropertyStatisticsTest):
def test_get_query_for_items_for_property_negative(self):
result = self.stats.get_query_for_items_for_property_negative('P21', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_no_grouping(self):
result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity wdt:P551 [] .} UNION
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_totals(self):
result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
class SparqlQueryTest(PropertyStatisticsTest):
def setUp(self):
super().setUp()
patcher = patch('pywikibot.data.sparql.SparqlQuery', autospec=True)
self.mock_sparql_query = patcher.start()
self.addCleanup(patcher.stop)
def assert_query_called(self, query):
self.mock_sparql_query.return_value.select.assert_called_once_with(query)
class GetCountFromSparqlTest(SparqlQueryTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
result = self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, 18)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
result = self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, None)
class GetGroupingCountsFromSparqlTest(SparqlQueryTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q1', 'count': 10},
{'grouping': 'http://www.wikidata.org/entity/Q2', 'count': 5},
]
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
expected = OrderedDict([('Q1', 10), ('Q2', 5)])
self.assertEqual(result, expected)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, None)
class SparqlCountTest(SparqlQueryTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
def test_get_property_info_no_grouping(self):
result = self.stats.get_property_info_no_grouping('P1')
query = (
"\n"
"SELECT (COUNT(?entity) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS { ?entity p:P1 _:b29. })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_qualifier_info_no_grouping(self):
result = self.stats.get_qualifier_info_no_grouping('P1', 'P2')
query = (
"\n"
"SELECT (COUNT(?entity) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER EXISTS { ?entity p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals_for_property(self):
result = self.stats.get_totals_for_property('P1')
query = (
"\n"
"SELECT (COUNT(?item) as ?count) WHERE {\n"
" ?item wdt:P31 wd:Q41960\n"
" FILTER EXISTS { ?item p:P1[] } .\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals_for_qualifier(self):
result = self.stats.get_totals_for_qualifier("P1", "P2")
query = (
"\n"
"SELECT (COUNT(?item) as ?count) WHERE {\n"
" ?item wdt:P31 wd:Q41960\n"
" FILTER EXISTS { ?item p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals_no_grouping(self):
result = self.stats.get_totals_no_grouping()
query = (
"\n"
"SELECT (COUNT(?item) as ?count) WHERE {\n"
" ?item wdt:P31 wd:Q41960\n"
" MINUS { ?item wdt:P551 _:b28. }\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals(self):
result = self.stats.get_totals()
query = (
"\n"
"SELECT (COUNT(?item) as ?count) WHERE {\n"
" ?item wdt:P31 wd:Q41960\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
class GetGroupingInformationTest(SparqlQueryTest):
def test_get_grouping_information(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict()
)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_grouping_threshold(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict()
)
self.stats.grouping_threshold = 5
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 5)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_higher_grouping(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'higher_grouping': 'NZL', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'higher_grouping': 'USA', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'higher_grouping': 'USA', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict([('Q3115846', 'NZL'), ('Q5087901', 'USA'), ('Q623333', 'USA')])
)
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
query = (
"\n"
"SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) "
"(COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" OPTIONAL { ?grouping wdt:P17/wdt:P298 ?_higher_grouping }.\n"
"} GROUP BY ?grouping ?higher_grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_empty_result(self):
self.mock_sparql_query.return_value.select.return_value = None
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
with self.assertRaises(QueryException):
self.stats.get_grouping_information()
self.assert_query_called(query)
class GetInfoTest(SparqlQueryTest):
def setUp(self):
super().setUp()
self.sparql_return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
self.expected = OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)])
class GetPropertyInfoTest(GetInfoTest):
def test_get_property_info(self):
self.mock_sparql_query.return_value.select.return_value = self.sparql_return_value
result = self.stats.get_property_info('P1')
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER EXISTS { ?entity p:P1 [] } .\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assert_query_called(query)
self.assertEqual(result, self.expected)
def test_get_property_info_empty_result(self):
self.mock_sparql_query.return_value.select.return_value = None
expected = None
result = self.stats.get_property_info('P1')
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER EXISTS { ?entity p:P1 [] } .\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assert_query_called(query)
self.assertEqual(result, expected)
class GetQualifierInfoTest(GetInfoTest):
def test_get_qualifier_info(self):
self.mock_sparql_query.return_value.select.return_value = self.sparql_return_value
result = self.stats.get_qualifier_info('P1', qualifier="P2")
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER EXISTS { ?entity p:P1 [ ps:P1 [] ; pq:P2 [] ] } .\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assert_query_called(query)
self.assertEqual(result, self.expected)
class TestGetHeader(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.stats.grouping_threshold = 7
self.stats.property_threshold = 4
def test_get_header(self):
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 7 items)\n'
'! colspan="4"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
)
self.assertEqual(result, expected)
def test_get_header_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="3" |Top groupings (Minimum 7 items)\n'
'! colspan="4"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! \n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
)
self.assertEqual(result, expected)
class MakeFooterTest(SparqlQueryTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.side_effect = [
[{'count': '120'}],
[{'count': '30'}],
[{'count': '80'}],
[{'count': '10'}],
[{'count': '12'}]
]
def test_make_footer(self):
result = self.stats.make_footer()
expected = (
'|- class="sortbottom"\n'
"|\'\'\'Totals\'\'\' (all items):\n"
"| 120\n"
"| {{Integraality cell|25.0|30|property=P21}}\n"
"| {{Integraality cell|66.67|80|property=P19}}\n"
"| {{Integraality cell|8.33|10|property=P1}}\n"
"| {{Integraality cell|10.0|12|property=P3}}\n"
"|}\n"
)
self.assertEqual(result, expected)