diff --git a/integraality/property_statistics.py b/integraality/property_statistics.py
index 4c81f2d..0c6ad20 100644
--- a/integraality/property_statistics.py
+++ b/integraality/property_statistics.py
@@ -1,425 +1,424 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Calculate and generate statistics
"""
import collections
import logging
from enum import Enum
import pywikibot
import pywikibot.data.sparql
from column import ColumnMaker, GroupingType
from line import (
NoGroupGrouping,
PropertyGrouping,
TotalsGrouping,
UnknownValueGrouping
)
from statsd.defaults.env import statsd
class QueryException(Exception):
def __init__(self, message, query):
super().__init__(message)
self.query = query
class PropertyStatistics:
"""
Generate statitics
"""
UNKNOWN_VALUE_PREFIX = "http://www.wikidata.org/.well-known/genid/"
GROUP_MAPPING = Enum('GROUP_MAPPING', {
'NO_GROUPING': 'None',
'TOTALS': '',
'UNKNOWN_VALUE': '{{int:wikibase-snakview-variations-somevalue-label}}'
})
def __init__(self, selector_sparql, columns, grouping_property, grouping_type=None, higher_grouping=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, grouping_threshold=20, property_threshold=0): # noqa
"""
Set what to work on and other variables here.
"""
site = pywikibot.Site('en', 'wikipedia')
self.repo = site.data_repository()
self.columns = {column.get_key(): column for column in columns}
self.grouping_property = grouping_property
if grouping_type:
self.grouping_type = GroupingType(grouping_type)
else:
self.grouping_type = None
self.higher_grouping = higher_grouping
self.higher_grouping_type = higher_grouping_type
self.selector_sparql = selector_sparql
self.stats_for_no_group = stats_for_no_group
self.grouping_threshold = grouping_threshold
self.property_threshold = property_threshold
self.grouping_link = grouping_link
self.cell_template = 'Integraality cell'
@statsd.timer('property_statistics.sparql.groupings')
def get_grouping_information(self):
"""
Get all groupings and their counts.
:return: List of Grouping objects
"""
if self.higher_grouping:
query = f"""
SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}.
}} GROUP BY ?grouping ?higher_grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
"""
elif self.grouping_type == GroupingType.YEAR:
query = f"""
SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?date .
BIND(YEAR(?date) as ?grouping) .
}} GROUP BY ?grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
"""
else:
query = f"""
SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
}} GROUP BY ?grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
"""
groupings = collections.OrderedDict()
try:
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
raise QueryException(
"No result when querying groupings."
"Please investigate the 'all groupings' debug query in the dashboard header.",
query=query
)
except pywikibot.exceptions.TimeoutError:
raise QueryException(
"The Wikidata Query Service timed out when fetching groupings."
"You might be trying to do something too expensive."
"Please investigate the 'all groupings' debug query in the dashboard header.",
query=query
)
unknown_value_count = 0
for resultitem in queryresult:
if not resultitem.get('grouping') or resultitem.get('grouping').startswith(self.UNKNOWN_VALUE_PREFIX):
unknown_value_count += int(resultitem.get('count'))
else:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
if self.higher_grouping:
value = resultitem.get('higher_grouping')
if value:
value = value.replace(u'http://www.wikidata.org/entity/', u'')
higher_grouping = value
else:
higher_grouping = None
property_grouping = PropertyGrouping(title=qid, count=int(resultitem.get('count')), higher_grouping=higher_grouping)
groupings[property_grouping.get_key()] = property_grouping
if unknown_value_count:
unknown_value_grouping = UnknownValueGrouping(unknown_value_count)
groupings[unknown_value_grouping.get_key()] = unknown_value_grouping
return groupings
def get_query_for_items_for_property_positive(self, column, grouping):
column_key = column.get_key()
query = f"""
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {{
?entity {self.selector_sparql} ."""
if grouping == self.GROUP_MAPPING.TOTALS:
pass
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f"""
MINUS {{
?entity wdt:{self.grouping_property} [] .
}}"""
elif grouping == self.GROUP_MAPPING.UNKNOWN_VALUE:
query += f"""
?entity wdt:{self.grouping_property} ?grouping.
FILTER wikibase:isSomeValue(?grouping)."""
elif self.grouping_type == GroupingType.YEAR:
query += f"""
?entity wdt:{self.grouping_property} ?date.
BIND(YEAR(?date) as ?year).
FILTER(?year = {grouping})."""
else:
query += f"""
?entity wdt:{self.grouping_property} wd:{grouping} ."""
query += column.get_filter_for_positive_query()
query += """}
"""
return query
def get_query_for_items_for_property_negative(self, column, grouping):
column_key = column.get_key()
query = f"""
SELECT DISTINCT ?entity ?entityLabel WHERE {{
?entity {self.selector_sparql} ."""
if grouping == self.GROUP_MAPPING.TOTALS:
query += """
MINUS {"""
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f"""
MINUS {{
{{?entity wdt:{self.grouping_property} [] .}} UNION"""
elif grouping == self.GROUP_MAPPING.UNKNOWN_VALUE:
query += f"""
?entity wdt:{self.grouping_property} ?grouping.
FILTER wikibase:isSomeValue(?grouping).
MINUS {{"""
elif self.grouping_type == GroupingType.YEAR:
query += f"""
?entity wdt:{self.grouping_property} ?date.
BIND(YEAR(?date) as ?year).
FILTER(?year = {grouping}).
MINUS {{"""
else:
query += f"""
?entity wdt:{self.grouping_property} wd:{grouping} .
MINUS {{"""
query += column.get_filter_for_negative_query()
query += """}
"""
return query
def get_totals_no_grouping(self):
query = f"""
SELECT (COUNT(*) as ?count) WHERE {{
?entity {self.selector_sparql}
MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }}
}}
"""
return self._get_count_from_sparql(query)
def get_totals(self):
query = f"""
SELECT (COUNT(*) as ?count) WHERE {{
?entity {self.selector_sparql}
}}
"""
return self._get_count_from_sparql(query)
@staticmethod
@statsd.timer('property_statistics.sparql.count')
def _get_count_from_sparql(query):
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
raise QueryException("No result when running a SPARQL query.", query=query)
return int(queryresult[0].get('count'))
@statsd.timer('property_statistics.sparql.grouping_counts')
def _get_grouping_counts_from_sparql(self, query):
result = collections.OrderedDict()
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
return None
for resultitem in queryresult:
if not resultitem.get('grouping') or resultitem.get('grouping').startswith(self.UNKNOWN_VALUE_PREFIX):
if self.GROUP_MAPPING.UNKNOWN_VALUE.name not in result.keys():
result[self.GROUP_MAPPING.UNKNOWN_VALUE.name] = 0
result[self.GROUP_MAPPING.UNKNOWN_VALUE.name] += int(resultitem.get('count'))
else:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
result[qid] = int(resultitem.get('count'))
return result
@staticmethod
def _get_percentage(count, total):
if not count:
return 0
return round(1.0 * count / max(total, 1) * 100, 2)
def get_header(self):
text = u'{| class="wikitable sortable"\n'
colspan = 3 if self.higher_grouping else 2
text += f'! colspan="{colspan}" |Top groupings (Minimum {self.grouping_threshold} items)\n'
text += f'! colspan="{len(self.columns)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n' # noqa
text += u'|-\n'
if self.higher_grouping:
text += u'! \n'
text += u'! Name\n'
text += u'! Count\n'
for column_entry in self.columns.values():
text += column_entry.make_column_header()
return text
def make_stats_for_no_group(self):
"""
Query the data for no_group, return the wikitext
"""
count = self.get_totals_no_grouping()
grouping_object = NoGroupGrouping(count=count, higher_grouping=self.higher_grouping)
for (column_entry_key, column_entry) in self.columns.items():
value = self._get_count_from_sparql(column_entry.get_info_no_grouping_query(self))
grouping_object.cells[column_entry_key] = value
text = u'|-\n'
text += grouping_object.format_header_cell(self.grouping_type)
text += f'| {count} \n'
for column_entry in self.columns.values():
text += grouping_object.format_cell(column_entry, self.cell_template)
return text
def format_stats_for_one_grouping(self, grouping_object):
"""
Query the data for one group, return the wikitext.
"""
text = u'|-\n'
grouping = grouping_object.title
item_count = grouping_object.count
text += grouping_object.format_header_cell(self.grouping_type)
if self.grouping_link:
try:
group_item = pywikibot.ItemPage(self.repo, grouping)
group_item.get()
label = group_item.labels["en"]
except (pywikibot.exceptions.InvalidTitleError, KeyError):
logging.info(f"Could not retrieve label for {grouping}")
label = grouping
text += f'| [[{self.grouping_link}/{label}|{item_count}]] \n'
else:
text += f'| {item_count} \n'
for column_entry in self.columns.values():
text += grouping_object.format_cell(column_entry, self.cell_template)
return text
- def make_footer(self):
+ def make_totals(self):
count = self.get_totals()
grouping_object = TotalsGrouping(count=count, title='', higher_grouping=self.higher_grouping)
for (column_entry_key, column_entry) in self.columns.items():
value = self._get_count_from_sparql(column_entry.get_totals_query(self))
grouping_object.cells[column_entry_key] = value
text = u'|- class="sortbottom"\n'
text += grouping_object.format_header_cell(self.grouping_type)
text += f'| {count} \n'
for column_entry in self.columns.values():
text += grouping_object.format_cell(column_entry, self.cell_template)
-
- text += u'|}\n'
return text
@statsd.timer('property_statistics.processing')
def retrieve_and_process_data(self):
"""
Query the data, output wikitext
"""
groupings = self.retrieve_data()
text = self.process_data(groupings)
return text
def retrieve_data(self):
logging.info("Retrieving grouping information...")
try:
groupings = self.get_grouping_information()
except QueryException as e:
logging.error('No groupings found.')
raise e
logging.info(f'Grouping retrieved: {len(groupings)}')
for (column_entry_key, column_entry) in self.columns.items():
data = self._get_grouping_counts_from_sparql(column_entry.get_info_query(self))
for (grouping_item, value) in data.items():
grouping = groupings.get(grouping_item)
if grouping:
grouping.cells[column_entry_key] = value
else:
logging.debug(f'Discarding data on {grouping_item}, not in the groupings')
return groupings
def process_data(self, groupings):
text = self.get_header()
for grouping in sorted(groupings.values(), key=lambda t: t.count, reverse=True):
text += self.format_stats_for_one_grouping(grouping)
if self.stats_for_no_group:
text += self.make_stats_for_no_group()
- text += self.make_footer()
+ text += self.make_totals()
+ text += u'|}\n'
return text
def main(*args):
"""
Main function.
"""
columns = [
ColumnMaker.make('P21', None),
ColumnMaker.make('P19', None),
ColumnMaker.make('Lde', None),
ColumnMaker.make('Dde', None),
]
logging.info("Main function...")
stats = PropertyStatistics(
columns=columns,
selector_sparql=u'wdt:P10241 wd:Q41960',
grouping_property=u'P551',
stats_for_no_group=True,
grouping_threshold=5,
property_threshold=1,
)
print(stats.retrieve_and_process_data())
if __name__ == "__main__":
main()
diff --git a/integraality/tests/test_property_statistics.py b/integraality/tests/test_property_statistics.py
index 8eade82..8b2e548 100644
--- a/integraality/tests/test_property_statistics.py
+++ b/integraality/tests/test_property_statistics.py
@@ -1,1216 +1,1213 @@
# -*- coding: utf-8 -*-
"""Unit tests for functions.py."""
import unittest
from collections import OrderedDict
from unittest.mock import patch
import pywikibot
from column import DescriptionColumn, LabelColumn, PropertyColumn
from line import PropertyGrouping, UnknownValueGrouping, YearGrouping
from property_statistics import PropertyStatistics, QueryException
class PropertyStatisticsTest(unittest.TestCase):
def setUp(self):
self.columns = [
PropertyColumn(property='P21'),
PropertyColumn(property='P19'),
PropertyColumn(property='P1', qualifier='P2'),
PropertyColumn(property='P3', value='Q4', qualifier='P5'),
LabelColumn(language='br'),
DescriptionColumn(language='xy'),
]
self.stats = PropertyStatistics(
columns=self.columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P551',
property_threshold=10
)
class SparqlQueryTest(unittest.TestCase):
def setUp(self):
super().setUp()
patcher = patch('pywikibot.data.sparql.SparqlQuery', autospec=True)
self.mock_sparql_query = patcher.start()
self.addCleanup(patcher.stop)
def assert_query_called(self, query):
self.mock_sparql_query.return_value.select.assert_called_once_with(query)
class TestLabelColumn(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = LabelColumn('br')
def test_simple(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{#language:br}}\n'
self.assertEqual(result, expected)
def test_get_key(self):
result = self.column.get_key()
self.assertEqual(result, 'Lbr')
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, query)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
class TestDescriptionColumn(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = DescriptionColumn('br')
def test_simple(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{#language:br}}\n'
self.assertEqual(result, expected)
def test_get_key(self):
result = self.column.get_key()
self.assertEqual(result, 'Dbr')
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, query)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
class MakeStatsForNoGroupTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
patcher1 = patch('property_statistics.PropertyStatistics.get_totals_no_grouping', autospec=True)
self.mock_get_totals_no_grouping = patcher1.start()
self.addCleanup(patcher1.stop)
self.mock_get_totals_no_grouping.return_value = 20
self.mock_sparql_query.return_value.select.side_effect = [
[{'count': '2'}],
[{'count': '10'}],
[{'count': '15'}],
[{'count': '5'}],
[{'count': '4'}],
[{'count': '8'}],
]
def test_make_stats_for_no_group(self):
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"| No grouping\n"
"| 20 \n"
"| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n"
"| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n"
"| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n"
"| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.assertEqual(self.mock_sparql_query.call_count, 6)
def test_make_stats_for_no_group_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"|\n"
"| No grouping\n"
"| 20 \n"
"| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n"
"| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n"
"| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n"
"| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.assertEqual(self.mock_sparql_query.call_count, 6)
def test_make_stats_for_no_group_with_grouping_link(self):
self.stats.grouping_link = "Foo"
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"| No grouping\n"
"| 20 \n"
"| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n"
"| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n"
"| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n"
"| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.assertEqual(self.mock_sparql_query.call_count, 6)
class MakeStatsForOneGroupingTest(PropertyStatisticsTest):
def setUp(self):
super().setUp()
# self.stats.column_data = {
# 'P21': OrderedDict([
# ('Q3115846', 10), ('Q5087901', 6),
# ('UNKNOWN_VALUE', 4)
# ]),
# 'P19': OrderedDict([('Q3115846', 8), ('Q2166574', 5)]),
# 'P1P2': OrderedDict([('Q3115846', 2), ('Q2166574', 9)]),
# 'P3Q4P5': OrderedDict([('Q3115846', 7), ('Q2166574', 1)]),
# 'Lbr': OrderedDict([('Q3115846', 1), ('Q2166574', 2)]),
# 'Dxy': OrderedDict([('Q3115846', 2), ('Q2166574', 1)]),
# }
def test_format_stats_for_one_grouping(self):
grouping = PropertyGrouping(title='Q3115846', count=10)
grouping.cells = OrderedDict([
('P21', 10),
('P19', 8),
('P1P2', 2),
('P3Q4P5', 7),
('Lbr', 1),
('Dxy', 2),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
def test_make_stats_for_unknown_grouping(self):
grouping = UnknownValueGrouping(title='UNKNOWN_VALUE', count=10)
grouping.cells = OrderedDict([
('P21', 4),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| {{int:wikibase-snakview-variations-somevalue-label}}\n'
'| 10 \n'
'| {{Integraality cell|40.0|4|column=P21|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P19|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P1/P2|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P3/Q4/P5|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=Lbr|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=Dxy|grouping=UNKNOWN_VALUE}}\n'
)
self.assertEqual(result, expected)
def test_make_stats_for_unknown_grouping_with_grouping_link(self):
self.stats.grouping_link = "Foo"
grouping = UnknownValueGrouping(title='UNKNOWN_VALUE', count=10)
grouping.cells = OrderedDict([
('P21', 4),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| {{int:wikibase-snakview-variations-somevalue-label}}\n'
'| [[Foo/UNKNOWN_VALUE|10]] \n'
'| {{Integraality cell|40.0|4|column=P21|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P19|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P1/P2|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=P3/Q4/P5|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=Lbr|grouping=UNKNOWN_VALUE}}\n'
'| {{Integraality cell|0|0|column=Dxy|grouping=UNKNOWN_VALUE}}\n'
)
self.assertEqual(result, expected)
def test_format_stats_for_one_grouping_with_higher_grouping(self):
self.stats.higher_grouping = "wdt:P17/wdt:P298"
grouping = PropertyGrouping(title='Q3115846', count=10, higher_grouping="Q1")
grouping.cells = OrderedDict([
('P21', 10),
('P19', 8),
('P1P2', 2),
('P3Q4P5', 7),
('Lbr', 1),
('Dxy', 2),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| data-sort-value="Q1"| {{Q|Q1}}\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
@patch('pywikibot.ItemPage', autospec=True)
def test_format_stats_for_one_grouping_with_grouping_link(self, mock_item_page):
mock_item_page.return_value.labels = {'en': 'Bar'}
self.stats.grouping_link = "Foo"
grouping = PropertyGrouping(title='Q3115846', count=10)
grouping.cells = OrderedDict([
('P21', 10),
('P19', 8),
('P1P2', 2),
('P3Q4P5', 7),
('Lbr', 1),
('Dxy', 2),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| [[Foo/Bar|10]] \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
@patch('pywikibot.ItemPage', autospec=True)
def test_format_stats_for_one_grouping_with_grouping_link_failure(self, mock_item_page):
mock_item_page.side_effect = pywikibot.exceptions.InvalidTitleError('Error')
self.stats.grouping_link = "Foo"
grouping = PropertyGrouping(title='Q3115846', count=10)
grouping.cells = OrderedDict([
('P21', 10),
('P19', 8),
('P1P2', 2),
('P3Q4P5', 7),
('Lbr', 1),
('Dxy', 2),
])
with self.assertLogs(level='INFO') as cm:
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| [[Foo/Q3115846|10]] \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
self.assertEqual(cm.output, ['INFO:root:Could not retrieve label for Q3115846'])
def test_make_stats_for_year_grouping(self):
grouping = YearGrouping(title='2001', count=10)
grouping.cells = OrderedDict([
('P21', 4),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| 2001\n'
'| 10 \n'
'| {{Integraality cell|40.0|4|column=P21|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P19|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P1/P2|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P3/Q4/P5|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=Lbr|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=Dxy|grouping=2001}}\n'
)
self.assertEqual(result, expected)
def test_make_stats_for_year_grouping_with_grouping_link(self):
self.stats.grouping_link = "Foo"
grouping = YearGrouping(title='2001', count=10)
grouping.cells = OrderedDict([
('P21', 4),
])
result = self.stats.format_stats_for_one_grouping(grouping)
expected = (
'|-\n'
'| 2001\n'
'| [[Foo/2001|10]] \n'
'| {{Integraality cell|40.0|4|column=P21|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P19|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P1/P2|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=P3/Q4/P5|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=Lbr|grouping=2001}}\n'
'| {{Integraality cell|0|0|column=Dxy|grouping=2001}}\n'
)
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyPositive(PropertyStatisticsTest):
def test_get_query_for_items_for_property_positive(self):
result = self.stats.get_query_for_items_for_property_positive(self.stats.columns.get('P21'), 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_no_grouping(self):
result = self.stats.get_query_for_items_for_property_positive(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
?entity wdt:P551 [] .
}
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_totals(self):
result = self.stats.get_query_for_items_for_property_positive(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_label(self):
result = self.stats.get_query_for_items_for_property_positive(self.stats.columns.get('Lbr'), 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
FILTER(EXISTS {
?entity rdfs:label ?lang_label.
FILTER((LANG(?lang_label)) = "br").
})
SERVICE wikibase:label { bd:serviceParam wikibase:language "br". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_unknown_value_grouping(self):
result = self.stats.get_query_for_items_for_property_positive(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.UNKNOWN_VALUE)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 ?grouping.
FILTER wikibase:isSomeValue(?grouping).
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_year_grouping(self):
stats = PropertyStatistics(
columns=self.columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P577',
grouping_type='year',
property_threshold=10
)
result = stats.get_query_for_items_for_property_positive(self.stats.columns.get('P21'), 2006)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P577 ?date.
BIND(YEAR(?date) as ?year).
FILTER(?year = 2006).
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyNegative(PropertyStatisticsTest):
def test_get_query_for_items_for_property_negative(self):
result = self.stats.get_query_for_items_for_property_negative(self.stats.columns.get('P21'), 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_no_grouping(self):
result = self.stats.get_query_for_items_for_property_negative(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity wdt:P551 [] .} UNION
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_totals(self):
result = self.stats.get_query_for_items_for_property_negative(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_label(self):
result = self.stats.get_query_for_items_for_property_negative(self.stats.columns.get('Lbr'), 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
MINUS {
{ ?entity rdfs:label ?lang_label.
FILTER((LANG(?lang_label)) = "br") }
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_unknown_value_grouping(self):
result = self.stats.get_query_for_items_for_property_negative(self.stats.columns.get('P21'), self.stats.GROUP_MAPPING.UNKNOWN_VALUE)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 ?grouping.
FILTER wikibase:isSomeValue(?grouping).
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_year_grouping(self):
stats = PropertyStatistics(
columns=self.columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P577',
grouping_type='year',
property_threshold=10
)
result = stats.get_query_for_items_for_property_negative(self.stats.columns.get('P21'), 2006)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P577 ?date.
BIND(YEAR(?date) as ?year).
FILTER(?year = 2006).
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
class GetCountFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
result = self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, 18)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
with self.assertRaises(QueryException):
self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
class GetGroupingCountsFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q1', 'count': 10},
{'grouping': 'http://www.wikidata.org/entity/Q2', 'count': 5},
]
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
expected = OrderedDict([('Q1', 10), ('Q2', 5)])
self.assertEqual(result, expected)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, None)
def test_return_count_with_unknown(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q1', 'count': 10},
{'grouping': 'http://www.wikidata.org/entity/Q2', 'count': 5},
{'grouping': 'http://www.wikidata.org/.well-known/genid/6ab4c2d7cb4ac72721335af5b8ba09c7', 'count': 2},
{'grouping': 'http://www.wikidata.org/.well-known/genid/1469448a291c6fbe5df8306cb52ef18b', 'count': 1}
]
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
expected = OrderedDict([('Q1', 10), ('Q2', 5), ('UNKNOWN_VALUE', 3)])
self.assertEqual(result, expected)
class SparqlCountTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
def test_get_totals_no_grouping(self):
result = self.stats.get_totals_no_grouping()
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals(self):
result = self.stats.get_totals()
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
class GetGroupingInformationTest(SparqlQueryTest, PropertyStatisticsTest):
def test_get_grouping_information(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10),
'Q5087901': PropertyGrouping(title='Q5087901', count=6),
'Q623333': PropertyGrouping(title='Q623333', count=6)
}
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_grouping_threshold(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10),
'Q5087901': PropertyGrouping(title='Q5087901', count=6),
'Q623333': PropertyGrouping(title='Q623333', count=6)
}
self.stats.grouping_threshold = 5
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 5)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_higher_grouping(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'higher_grouping': 'NZL', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'higher_grouping': 'USA', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'higher_grouping': 'USA', 'count': '6'}
]
expected = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10, higher_grouping='NZL'),
'Q5087901': PropertyGrouping(title='Q5087901', count=6, higher_grouping='USA'),
'Q623333': PropertyGrouping(title='Q623333', count=6, higher_grouping='USA')
}
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
query = (
"\n"
"SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) "
"(COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" OPTIONAL { ?grouping wdt:P17/wdt:P298 ?_higher_grouping }.\n"
"} GROUP BY ?grouping ?higher_grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_empty_result(self):
self.mock_sparql_query.return_value.select.return_value = None
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
with self.assertRaises(QueryException):
self.stats.get_grouping_information()
self.assert_query_called(query)
def test_get_grouping_information_timeout(self):
self.mock_sparql_query.return_value.select.side_effect = pywikibot.exceptions.TimeoutError("Error")
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
with self.assertRaises(QueryException):
self.stats.get_grouping_information()
self.assert_query_called(query)
def test_get_grouping_information_unknown_value(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/.well-known/genid/6ab4c2d7cb4ac72721335af5b8ba09c7', 'count': '2'},
{'grouping': 'http://www.wikidata.org/.well-known/genid/1469448a291c6fbe5df8306cb52ef18b', 'count': '1'}
]
expected = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10),
'Q5087901': PropertyGrouping(title='Q5087901', count=6),
'UNKNOWN_VALUE': UnknownValueGrouping(count=3)
}
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_year(self):
stats = PropertyStatistics(
columns=self.columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P577',
grouping_type='year',
property_threshold=10
)
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': '2001', 'count': '10'},
{'grouping': '2002', 'count': '6'},
]
expected = {
'2001': YearGrouping(title='2001', count=10),
'2002': YearGrouping(title='2002', count=6)
}
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P577 ?date .\n"
" BIND(YEAR(?date) as ?grouping) .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_year_unknown_value(self):
stats = PropertyStatistics(
columns=self.columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P577',
grouping_type='year',
property_threshold=10
)
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': '2001', 'count': '10'},
{'grouping': '2002', 'count': '6'},
{'grouping': '', 'count': '4'},
]
expected = {
'2001': YearGrouping(title='2001', count=10),
'2002': YearGrouping(title='2002', count=6),
'UNKNOWN_VALUE': UnknownValueGrouping(count=4)
}
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P577 ?date .\n"
" BIND(YEAR(?date) as ?grouping) .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
class TestGetHeader(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.stats.grouping_threshold = 7
self.stats.property_threshold = 4
def test_get_header(self):
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 7 items)\n'
'! colspan="6"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
)
self.assertEqual(result, expected)
def test_get_header_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="3" |Top groupings (Minimum 7 items)\n'
'! colspan="6"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! \n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
)
self.assertEqual(result, expected)
-class MakeFooterTest(SparqlQueryTest, PropertyStatisticsTest):
+class MakeTotalsTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.side_effect = [
[{'count': '120'}],
[{'count': '30'}],
[{'count': '80'}],
[{'count': '10'}],
[{'count': '12'}],
[{'count': '24'}],
[{'count': '36'}],
]
- def test_make_footer(self):
- result = self.stats.make_footer()
+ def test_make_totals(self):
+ result = self.stats.make_totals()
expected = (
'|- class="sortbottom"\n'
"| \'\'\'Totals\'\'\' (all items)\n"
"| 120 \n"
"| {{Integraality cell|25.0|30|column=P21|grouping=}}\n"
"| {{Integraality cell|66.67|80|column=P19|grouping=}}\n"
"| {{Integraality cell|8.33|10|column=P1/P2|grouping=}}\n"
"| {{Integraality cell|10.0|12|column=P3/Q4/P5|grouping=}}\n"
"| {{Integraality cell|20.0|24|column=Lbr|grouping=}}\n"
"| {{Integraality cell|30.0|36|column=Dxy|grouping=}}\n"
- "|}\n"
)
self.assertEqual(result, expected)
- def test_make_footer_with_higher_grouping(self):
+ def test_make_totals_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
- result = self.stats.make_footer()
+ result = self.stats.make_totals()
expected = (
'|- class="sortbottom"\n'
'||\n'
"| \'\'\'Totals\'\'\' (all items)\n"
"| 120 \n"
"| {{Integraality cell|25.0|30|column=P21|grouping=}}\n"
"| {{Integraality cell|66.67|80|column=P19|grouping=}}\n"
"| {{Integraality cell|8.33|10|column=P1/P2|grouping=}}\n"
"| {{Integraality cell|10.0|12|column=P3/Q4/P5|grouping=}}\n"
"| {{Integraality cell|20.0|24|column=Lbr|grouping=}}\n"
"| {{Integraality cell|30.0|36|column=Dxy|grouping=}}\n"
- "|}\n"
)
self.assertEqual(result, expected)
- def test_make_footer_with_grouping_link(self):
+ def test_make_totals_with_grouping_link(self):
self.stats.grouping_link = "Foo"
- result = self.stats.make_footer()
+ result = self.stats.make_totals()
expected = (
'|- class="sortbottom"\n'
"| \'\'\'Totals\'\'\' (all items)\n"
"| 120 \n"
"| {{Integraality cell|25.0|30|column=P21|grouping=}}\n"
"| {{Integraality cell|66.67|80|column=P19|grouping=}}\n"
"| {{Integraality cell|8.33|10|column=P1/P2|grouping=}}\n"
"| {{Integraality cell|10.0|12|column=P3/Q4/P5|grouping=}}\n"
"| {{Integraality cell|20.0|24|column=Lbr|grouping=}}\n"
"| {{Integraality cell|30.0|36|column=Dxy|grouping=}}\n"
- "|}\n"
)
self.assertEqual(result, expected)
class RetrieveDataTest(SparqlQueryTest, PropertyStatisticsTest):
def test_retrieve_data_empty(self):
result = self.stats.retrieve_data()
expected = {}
self.assertEqual(result, expected)
def test_retrieve_data(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
result = self.stats.retrieve_data()
expected = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10),
'Q5087901': PropertyGrouping(title='Q5087901', count=6),
'Q623333': PropertyGrouping(title='Q623333', count=6)
}
self.assertEqual(result, expected)
class ProcessDataTest(SparqlQueryTest, PropertyStatisticsTest):
def test_process_data_empty(self):
result = self.stats.process_data({})
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 20 items)\n'
'! colspan="6"|Top Properties (used at least 10 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
'|- class="sortbottom"\n'
"| '''Totals''' (all items)\n"
'| 1 \n'
'| {{Integraality cell|100.0|1|column=P21|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P19|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P1/P2|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P3/Q4/P5|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=Lbr|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=Dxy|grouping=}}\n'
'|}\n'
)
self.assertEqual(result, expected)
def test_process_data(self):
grouping_data = {
'Q3115846': PropertyGrouping(title='Q3115846', count=10, cells=OrderedDict([
('P21', 10),
('P19', 8),
('P1P2', 2),
('P3Q4P5', 7),
('Lbr', 1),
('Dxy', 2)
])
),
'Q5087901': PropertyGrouping(title='Q5087901', count=6, cells=OrderedDict([
('P21', 6),
('P19', 0),
('P1P2', 0),
('P3Q4P5', 0),
('Lbr', 0),
('Dxy', 0)
])
),
}
result = self.stats.process_data(grouping_data)
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 20 items)\n'
'! colspan="6"|Top Properties (used at least 10 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
'|-\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
'|-\n'
'| {{Q|Q5087901}}\n'
'| 6 \n'
'| {{Integraality cell|100.0|6|column=P21|grouping=Q5087901}}\n'
'| {{Integraality cell|0|0|column=P19|grouping=Q5087901}}\n'
'| {{Integraality cell|0|0|column=P1/P2|grouping=Q5087901}}\n'
'| {{Integraality cell|0|0|column=P3/Q4/P5|grouping=Q5087901}}\n'
'| {{Integraality cell|0|0|column=Lbr|grouping=Q5087901}}\n'
'| {{Integraality cell|0|0|column=Dxy|grouping=Q5087901}}\n'
'|- class="sortbottom"\n'
"| '''Totals''' (all items)\n"
'| 1 \n'
'| {{Integraality cell|100.0|1|column=P21|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P19|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P1/P2|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=P3/Q4/P5|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=Lbr|grouping=}}\n'
'| {{Integraality cell|100.0|1|column=Dxy|grouping=}}\n'
'|}\n'
)
self.assertEqual(result, expected)
class RetrieveAndProcessDataTest(SparqlQueryTest, PropertyStatisticsTest):
def test_retrieve_and_process_data(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
result = self.stats.retrieve_and_process_data()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 20 items)\n'
'! colspan="6"|Top Properties (used at least 10 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
'|-\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|100.0|10|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|100.0|10|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|100.0|10|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|100.0|10|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|100.0|10|column=Dxy|grouping=Q3115846}}\n'
'|-\n'
'| {{Q|Q5087901}}\n'
'| 6 \n'
'| {{Integraality cell|100.0|6|column=P21|grouping=Q5087901}}\n'
'| {{Integraality cell|100.0|6|column=P19|grouping=Q5087901}}\n'
'| {{Integraality cell|100.0|6|column=P1/P2|grouping=Q5087901}}\n'
'| {{Integraality cell|100.0|6|column=P3/Q4/P5|grouping=Q5087901}}\n'
'| {{Integraality cell|100.0|6|column=Lbr|grouping=Q5087901}}\n'
'| {{Integraality cell|100.0|6|column=Dxy|grouping=Q5087901}}\n'
'|-\n'
'| {{Q|Q623333}}\n'
'| 6 \n'
'| {{Integraality cell|100.0|6|column=P21|grouping=Q623333}}\n'
'| {{Integraality cell|100.0|6|column=P19|grouping=Q623333}}\n'
'| {{Integraality cell|100.0|6|column=P1/P2|grouping=Q623333}}\n'
'| {{Integraality cell|100.0|6|column=P3/Q4/P5|grouping=Q623333}}\n'
'| {{Integraality cell|100.0|6|column=Lbr|grouping=Q623333}}\n'
'| {{Integraality cell|100.0|6|column=Dxy|grouping=Q623333}}\n'
'|- class="sortbottom"\n'
"| '''Totals''' (all items)\n"
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=}}\n'
'| {{Integraality cell|100.0|10|column=P19|grouping=}}\n'
'| {{Integraality cell|100.0|10|column=P1/P2|grouping=}}\n'
'| {{Integraality cell|100.0|10|column=P3/Q4/P5|grouping=}}\n'
'| {{Integraality cell|100.0|10|column=Lbr|grouping=}}\n'
'| {{Integraality cell|100.0|10|column=Dxy|grouping=}}\n'
'|}\n'
)
self.assertEqual(result, expected)