diff --git a/integraality/pages_processor.py b/integraality/pages_processor.py
index 74134ec..a26b4bd 100644
--- a/integraality/pages_processor.py
+++ b/integraality/pages_processor.py
@@ -1,207 +1,211 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Bot to generate statistics
"""
import os
import re
from redis import StrictRedis
from ww import f
import pywikibot
from pywikibot import pagegenerators
from cache import RedisCache
from property_statistics import (
ColumnConfigMaker,
+ ColumnSyntaxException,
PropertyStatistics,
QueryException
)
REQUIRED_CONFIG_FIELDS = ['selector_sparql', 'grouping_property', 'properties']
class ProcessingException(Exception):
pass
class ConfigException(ProcessingException):
pass
class NoEndTemplateException(ProcessingException):
pass
class NoStartTemplateException(ProcessingException):
pass
class PagesProcessor:
def __init__(self, url="https://www.wikidata.org/wiki/", cache_client=None):
self.site = pywikibot.Site(url=url)
self.template_name = 'Property dashboard'
self.end_template_name = 'Property dashboard end'
self.summary = u'Update property usage stats'
self.outputs = []
if not cache_client:
host = os.getenv("REDIS_HOST", 'tools-redis.svc.eqiad.wmflabs')
cache_client = StrictRedis(host=host, decode_responses=False)
self.cache = RedisCache(cache_client=cache_client)
def make_cache_key(self, page_title):
return ":".join([self.site.code, page_title]).replace(" ", "_")
def get_all_pages(self):
template = pywikibot.Page(self.site, self.template_name, ns=10)
return pagegenerators.ReferringPageGenerator(template, onlyTemplateInclusion=True)
@staticmethod
def extract_elements_from_template_param(template_param):
"""Extract and sanitize the contents of a parsed template param."""
(field, _, value) = template_param.partition(u'=')
return (field.strip(), value.replace('{{!}}', '|'))
def parse_config_from_params(self, params):
return {
key: value for (key, value) in
[self.extract_elements_from_template_param(param) for param in params]
if key
}
def make_stats_object_arguments_for_page(self, page):
all_templates_with_params = page.templatesWithParams()
if self.template_name not in [template.title(with_ns=False) for (template, _) in all_templates_with_params]:
msg = (
"No start template '%s' found."
"The likely explanation is that inteGraality was invoked from a page that transcludes the page with the template. "
"Please invoke inteGraality directly from the page with the template." % self.template_name
)
raise NoStartTemplateException(msg)
if self.end_template_name not in [template.title(with_ns=False) for (template, _) in all_templates_with_params]:
raise NoEndTemplateException("No end template '%s' provided" % self.end_template_name)
start_templates_with_params = [
(template, params) for (template, params) in all_templates_with_params if
template.title(with_ns=False) == self.template_name
]
if len(start_templates_with_params) > 1:
pywikibot.warn("More than one template on the page %s" % page.title())
(template, params) = start_templates_with_params[0]
parsed_config = self.parse_config_from_params(params)
config = self.parse_config(parsed_config)
key = self.make_cache_key(page.title())
self.cache.set_cache_value(key, config)
return config
def make_stats_object_for_page(self, page):
config = self.make_stats_object_arguments_for_page(page)
try:
return PropertyStatistics(**config)
except TypeError:
raise ConfigException("The template parameters are incorrect.")
def process_page(self, page):
self.cache.invalidate(self.make_cache_key(page.title()))
stats = self.make_stats_object_for_page(page)
try:
output = stats.retrieve_and_process_data()
except QueryException as e:
raise ConfigException(e)
new_text = self.replace_in_page(output, page.get())
page.put(new_text, self.summary)
def parse_config(self, config):
for field in REQUIRED_CONFIG_FIELDS:
if field not in config:
pywikibot.output("Missing required field %s" % field)
raise ConfigException("A required field is missing: %s" % field)
config['columns'] = self.parse_config_properties(config['properties'])
del config['properties']
config['stats_for_no_group'] = bool(config.get('stats_for_no_group', False))
return config
@staticmethod
def parse_config_properties(properties_string):
properties = [x.strip() for x in properties_string.split(',')]
properties_data = []
for prop in properties:
try:
(key, title) = prop.split(':')
except ValueError:
(key, title) = (prop, None)
if key:
- properties_data.append(ColumnConfigMaker.make(key, title))
+ try:
+ properties_data.append(ColumnConfigMaker.make(key, title))
+ except ColumnSyntaxException as e:
+ raise ConfigException(e)
return properties_data
def replace_in_page(self, output, page_text):
regex_text = f('({{{{{self.template_name}.*?(?= {property_statistics.property_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
return query
def get_totals_query(self, property_statistics):
"""
Get the totals of entities with the column set.
:return: (str) SPARQL query
"""
query = f("""
SELECT (COUNT(*) as ?count) WHERE {{
?entity {property_statistics.selector_sparql}
FILTER(EXISTS {{{self.get_filter_for_info()}
}})
}}
""")
return query
def get_info_no_grouping_query(self, property_statistics):
"""
Get the usage counts for a column without a grouping
:return: (str) SPARQL query
"""
query = f("""
SELECT (COUNT(*) AS ?count) WHERE {{
?entity {property_statistics.selector_sparql} .
MINUS {{ ?entity wdt:{property_statistics.grouping_property} _:b28. }}
FILTER(EXISTS {{{self.get_filter_for_info()}
}})
}}
GROUP BY ?grouping
ORDER BY DESC (?count)
LIMIT 10
""")
return query
class PropertyConfig(ColumnConfig):
def __init__(self, property, title=None, value=None, qualifier=None):
self.property = property
self.title = title
self.value = value
self.qualifier = qualifier
def __eq__(self, other):
return (
self.property == other.property
and self.title == other.title
and self.value == other.value
and self.qualifier == other.qualifier
)
def get_title(self):
return "/".join([x for x in [self.property, self.value, self.qualifier] if x])
def get_key(self):
return "".join([x for x in [self.property, self.value, self.qualifier] if x])
def make_column_header(self):
if self.qualifier:
property_link = self.qualifier
else:
property_link = self.property
if self.title:
label = f('[[Property:{property_link}|{self.title}]]')
else:
label = f('{{{{Property|{property_link}}}}}')
return f('! data-sort-type="number"|{label}\n')
def get_filter_for_info(self):
if self.qualifier:
return f("""
?entity p:{self.property} [ ps:{self.property} {self.value or '[]'} ; pq:{self.qualifier} [] ]""")
else:
return f("""
?entity p:{self.property}[]""")
class TextConfig(ColumnConfig):
def __init__(self, language, title=None):
self.language = language
self.title = title
def __eq__(self, other):
return (
self.language == other.language
and self.title == other.title
)
def get_title(self):
return self.get_key()
def make_column_header(self):
if self.title:
text = f('{self.title}')
else:
text = f('{{{{#language:{self.language}}}}}')
return f('! data-sort-type="number"|{text}\n')
def get_filter_for_info(self):
return f("""
?entity {self.get_selector()} ?lang_label.
FILTER((LANG(?lang_label)) = '{self.language}').""")
class LabelConfig(TextConfig):
def get_key(self):
return 'L%s' % self.language
def get_selector(self):
return 'rdfs:label'
class DescriptionConfig(TextConfig):
def get_key(self):
return 'D%s' % self.language
def get_selector(self):
return 'schema:description'
class QueryException(Exception):
pass
class PropertyStatistics:
"""
Generate statitics
"""
GROUP_MAPPING = Enum('GROUP_MAPPING', {'NO_GROUPING': 'None', 'TOTALS': ''})
TEXT_SELECTOR_MAPPING = {'L': 'rdfs:label', 'D': 'schema:description'}
def __init__(self, selector_sparql, columns, grouping_property, higher_grouping=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, grouping_threshold=20, property_threshold=0): # noqa
"""
Set what to work on and other variables here.
"""
site = pywikibot.Site('en', 'wikipedia')
self.repo = site.data_repository()
self.columns = columns
self.grouping_property = grouping_property
self.higher_grouping = higher_grouping
self.higher_grouping_type = higher_grouping_type
self.selector_sparql = selector_sparql
self.stats_for_no_group = stats_for_no_group
self.grouping_threshold = grouping_threshold
self.property_threshold = property_threshold
self.grouping_link = grouping_link
self.column_data = {}
self.cell_template = 'Integraality cell'
@statsd.timer('property_statistics.sparql.groupings')
def get_grouping_information(self):
"""
Get the information for a single grouping.
:return: Tuple of two (ordered) dictionaries.
"""
if self.higher_grouping:
query = f("""
SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT *) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}.
}} GROUP BY ?grouping ?higher_grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
else:
query = f("""
SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {{
?entity {self.selector_sparql} .
?entity wdt:{self.grouping_property} ?grouping .
}} GROUP BY ?grouping
HAVING (?count >= {self.grouping_threshold})
ORDER BY DESC(?count)
LIMIT 1000
""")
grouping_counts = collections.OrderedDict()
grouping_groupings = collections.OrderedDict()
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
raise QueryException("No result when querying groupings.")
for resultitem in queryresult:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
grouping_counts[qid] = int(resultitem.get('count'))
if self.higher_grouping:
value = resultitem.get('higher_grouping')
if value:
value = value.replace(u'http://www.wikidata.org/entity/', u'')
grouping_groupings[qid] = value
return (grouping_counts, grouping_groupings)
def get_query_for_items_for_property_positive(self, column, grouping):
query = f("""
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {{
?entity {self.selector_sparql} .""")
if grouping == self.GROUP_MAPPING.TOTALS:
pass
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f("""
MINUS {{
?entity wdt:{self.grouping_property} [] .
}}""")
else:
query += f("""
?entity wdt:{self.grouping_property} wd:{grouping} .""")
if column.startswith('P'):
query += f("""
?entity p:{column} ?prop . OPTIONAL {{ ?prop ps:{column} ?value }}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
}}
""")
elif column.startswith('L') or column.startswith('D'):
query += f("""
FILTER(EXISTS {{
?entity {self.TEXT_SELECTOR_MAPPING[column[:1]]} ?lang_label.
FILTER((LANG(?lang_label)) = "{column[1:]}").
}})
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "{column[1:]}". }}
}}
""")
return query
def get_query_for_items_for_property_negative(self, column, grouping):
query = f("""
SELECT DISTINCT ?entity ?entityLabel WHERE {{
?entity {self.selector_sparql} .""")
if grouping == self.GROUP_MAPPING.TOTALS:
query += f("""
MINUS {{""")
elif grouping == self.GROUP_MAPPING.NO_GROUPING:
query += f("""
MINUS {{
{{?entity wdt:{self.grouping_property} [] .}} UNION""")
else:
query += f("""
?entity wdt:{self.grouping_property} wd:{grouping} .
MINUS {{""")
if column.startswith('P'):
query += f("""
{{?entity a wdno:{column} .}} UNION
{{?entity wdt:{column} ?prop .}}
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
}}
""")
elif column.startswith('L') or column.startswith('D'):
query += f("""
{{ ?entity {self.TEXT_SELECTOR_MAPPING[column[:1]]} ?lang_label.
FILTER((LANG(?lang_label)) = "{column[1:]}") }}
}}
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }}
}}
""")
return query
def get_totals_no_grouping(self):
query = f("""
SELECT (COUNT(*) as ?count) WHERE {{
?entity {self.selector_sparql}
MINUS {{ ?entity wdt:{self.grouping_property} _:b28. }}
}}
""")
return self._get_count_from_sparql(query)
def get_totals(self):
query = f("""
SELECT (COUNT(*) as ?count) WHERE {{
?entity {self.selector_sparql}
}}
""")
return self._get_count_from_sparql(query)
@staticmethod
@statsd.timer('property_statistics.sparql.count')
def _get_count_from_sparql(query):
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
return None
return int(queryresult[0].get('count'))
@staticmethod
@statsd.timer('property_statistics.sparql.grouping_counts')
def _get_grouping_counts_from_sparql(query):
result = collections.OrderedDict()
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
return None
for resultitem in queryresult:
qid = resultitem.get('grouping').replace(u'http://www.wikidata.org/entity/', u'')
result[qid] = int(resultitem.get('count'))
return result
@staticmethod
def _get_percentage(count, total):
if not count:
return 0
return round(1.0 * count / max(total, 1) * 100, 2)
def get_header(self):
text = u'{| class="wikitable sortable"\n'
colspan = 3 if self.higher_grouping else 2
text += f('! colspan="{colspan}" |Top groupings (Minimum {self.grouping_threshold} items)\n')
text += f('! colspan="{len(self.columns)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n') # noqa
text += u'|-\n'
if self.higher_grouping:
text += u'! \n'
text += u'! Name\n'
text += u'! Count\n'
for column_entry in self.columns:
text += column_entry.make_column_header()
return text
def format_higher_grouping_text(self, higher_grouping_value):
type_mapping = {
"country": "{{Flag|%s}}" % higher_grouping_value,
}
if re.match(r"Q\d+", higher_grouping_value):
higher_grouping_text = f('{{{{Q|{higher_grouping_value}}}}}')
elif re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value):
match = re.match(r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value)
image_name = match.groups()[0]
higher_grouping_text = f('[[File:{image_name}|center|100px]]')
higher_grouping_value = image_name
elif self.higher_grouping_type in type_mapping:
higher_grouping_text = type_mapping.get(self.higher_grouping_type)
else:
higher_grouping_text = higher_grouping_value
return f('| data-sort-value="{higher_grouping_value}"| {higher_grouping_text}\n')
def make_stats_for_no_group(self):
"""
Query the data for no_group, return the wikitext
"""
text = u'|-\n'
if self.higher_grouping:
text += u'|\n'
total_no_count = self.get_totals_no_grouping()
text += u'| No grouping \n'
text += f('| {total_no_count} \n')
for column_entry in self.columns:
column_count = self._get_count_from_sparql(column_entry.get_info_no_grouping_query(self))
percentage = self._get_percentage(column_count, total_no_count)
text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|column={column_entry.get_title()}|grouping={self.GROUP_MAPPING.NO_GROUPING.value}}}}}\n') # noqa
return text
def make_stats_for_one_grouping(self, grouping, item_count, higher_grouping):
"""
Query the data for one group, return the wikitext.
"""
text = u'|-\n'
if self.higher_grouping:
if higher_grouping:
text += self.format_higher_grouping_text(higher_grouping)
else:
text += u'|\n'
text += u'| {{Q|%s}}\n' % (grouping,)
if self.grouping_link:
try:
group_item = pywikibot.ItemPage(self.repo, grouping)
group_item.get()
label = group_item.labels["en"]
except (pywikibot.exceptions.InvalidTitle, KeyError):
logging.info(f("Could not retrieve label for {grouping}"))
label = grouping
text += f('| [[{self.grouping_link}/{label}|{item_count}]] \n')
else:
text += f('| {item_count} \n')
for column_entry in self.columns:
column_entry_key = column_entry.get_key()
try:
column_count = self.column_data.get(column_entry_key).get(grouping)
except AttributeError:
column_count = 0
if not column_count:
column_count = 0
percentage = self._get_percentage(column_count, item_count)
text += f('| {{{{{self.cell_template}|{percentage}|{column_count}|column={column_entry.get_title()}|grouping={grouping}}}}}\n') # noqa
return text
def make_footer(self):
total_items = self.get_totals()
text = u'|- class="sortbottom"\n|'
if self.higher_grouping:
text += u"|\n|"
text += f('\'\'\'Totals\'\'\' (all items):\n| {total_items}\n')
for column_entry in self.columns:
totalprop = self._get_count_from_sparql(column_entry.get_totals_query(self))
percentage = self._get_percentage(totalprop, total_items)
text += f('| {{{{{self.cell_template}|{percentage}|{totalprop}|column={column_entry.get_title()}}}}}\n')
text += u'|}\n'
return text
@statsd.timer('property_statistics.processing')
def retrieve_and_process_data(self):
"""
Query the data, output wikitext
"""
logging.info("Retrieving grouping information...")
try:
(groupings_counts, groupings_groupings) = self.get_grouping_information()
except QueryException as e:
logging.error(f('No groupings found.'))
raise e
logging.info(f('Grouping retrieved: {len(groupings_counts)}'))
for column_entry in self.columns:
column_entry_key = column_entry.get_key()
self.column_data[column_entry_key] = self._get_grouping_counts_from_sparql(column_entry.get_info_query(self))
text = self.get_header()
for (grouping, item_count) in groupings_counts.items():
higher_grouping = groupings_groupings.get(grouping)
text += self.make_stats_for_one_grouping(grouping, item_count, higher_grouping)
if self.stats_for_no_group:
text += self.make_stats_for_no_group()
text += self.make_footer()
return text
def main(*args):
"""
Main function.
"""
columns = [
PropertyConfig('P21'),
PropertyConfig('P19'),
LabelConfig('de'),
DescriptionConfig('de'),
]
logging.info("Main function...")
stats = PropertyStatistics(
columns=columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P551',
stats_for_no_group=True,
grouping_threshold=5,
property_threshold=1,
)
print(stats.retrieve_and_process_data())
if __name__ == "__main__":
main()
diff --git a/integraality/tests/test_pages_processor.py b/integraality/tests/test_pages_processor.py
index e48ba9c..771100c 100644
--- a/integraality/tests/test_pages_processor.py
+++ b/integraality/tests/test_pages_processor.py
@@ -1,285 +1,290 @@
# -*- coding: utf-8 -*-
"""Unit tests for functions.py."""
import argparse
import unittest
from unittest.mock import patch
import fakeredis
from integraality.pages_processor import ConfigException, PagesProcessor, main
from integraality.property_statistics import (
DescriptionConfig,
LabelConfig,
PropertyConfig
)
class ProcessortTest(unittest.TestCase):
def setUp(self):
fake_cache_client = fakeredis.FakeStrictRedis()
self.processor = PagesProcessor(cache_client=fake_cache_client)
class TestReplaceInPage(ProcessortTest):
def setUp(self):
self.processor = PagesProcessor()
self.text = """
Head
{{Property dashboard start
|properties=P136:genre,P404
|grouping_property=P400
|stats_for_no_group=1
|selector_sparql=wdt:P31/wdt:P279* wd:Q7889
|target_page_title=Wikidata:WikiProject Video games/Statistics/Platform
|grouping_link=Wikidata::WikiProject Video games/Reports/Platform
}}
foo
{{Property dashboard end}}
Bottom
"""
self.final_text = """
Head
{{Property dashboard start
|properties=P136:genre,P404
|grouping_property=P400
|stats_for_no_group=1
|selector_sparql=wdt:P31/wdt:P279* wd:Q7889
|target_page_title=Wikidata:WikiProject Video games/Statistics/Platform
|grouping_link=Wikidata::WikiProject Video games/Reports/Platform
}}
bar
{{Property dashboard end}}
Bottom
"""
def test_replace_in_page(self):
result = self.processor.replace_in_page("bar", self.text)
self.assertEqual(result, self.final_text)
def test_replace_in_page_escaped_pipe(self):
text = self.text.replace('wd:Q7889', '{{!}}')
final_text = self.final_text.replace('wd:Q7889', '{{!}}')
result = self.processor.replace_in_page("bar", text)
self.assertEqual(result, final_text)
class TestParseConfig(ProcessortTest):
def setUp(self):
self.processor = PagesProcessor()
def test_normal_config(self):
input_config = {
'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform',
'grouping_property': 'P400',
'stats_for_no_group': '1',
'properties': 'P136:genre,P404',
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
}
result = self.processor.parse_config(input_config)
expected = {
'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform',
'grouping_property': 'P400',
'stats_for_no_group': True,
'columns': [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
],
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889'
}
self.assertEqual(result, expected)
def test_minimal_config(self):
input_config = {
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
'grouping_property': 'P400',
'properties': 'P136:genre,P404',
}
result = self.processor.parse_config(input_config)
expected = {
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
'grouping_property': 'P400',
'columns': [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
],
'stats_for_no_group': False,
}
self.assertEqual(result, expected)
def test_full_config(self):
input_config = {
'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform',
'grouping_property': 'P400',
'stats_for_no_group': '1',
'properties': 'P136:genre,P404',
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
'grouping_threshold': '1',
'property_threshold': '2',
}
result = self.processor.parse_config(input_config)
expected = {
'grouping_link': 'Wikidata:WikiProject Video games/Reports/Platform',
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
'grouping_property': 'P400',
'columns': [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
],
'stats_for_no_group': True,
'grouping_threshold': '1',
'property_threshold': '2',
}
self.assertEqual(result, expected)
def test_empty_config(self):
input_config = {}
with(self.assertRaises(ConfigException)):
self.processor.parse_config(input_config)
def test_insufficient_config(self):
input_config = {
'selector_sparql': 'wdt:P31/wdt:P279* wd:Q7889',
}
with(self.assertRaises(ConfigException)):
self.processor.parse_config(input_config)
class TestParseParams(ProcessortTest):
def test_parse_config_from_params_minimal(self):
params = ['grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213']
expected = {
'grouping_property': 'P195',
'properties': 'P170:creator,P276',
'selector_sparql': 'wdt:P31 wd:Q3305213'
}
result = self.processor.parse_config_from_params(params)
self.assertEqual(result, expected)
def test_parse_config_from_params_with_empty_param(self):
params = ['', 'grouping_property=P195', 'properties=P170:creator,P276', 'selector_sparql=wdt:P31 wd:Q3305213']
expected = {
'grouping_property': 'P195',
'properties': 'P170:creator,P276',
'selector_sparql': 'wdt:P31 wd:Q3305213'
}
result = self.processor.parse_config_from_params(params)
self.assertEqual(result, expected)
def test_parse_config_from_params_with_escaped_pipe(self):
params = ['grouping_property=P195', 'properties=P170:creator,P276',
'selector_sparql=REGEX(?id, "^(a{{!}}b)")']
expected = {
'grouping_property': 'P195',
'properties': 'P170:creator,P276',
'selector_sparql': 'REGEX(?id, "^(a|b)")'
}
result = self.processor.parse_config_from_params(params)
self.assertEqual(result, expected)
class TestParseConfigProperties(ProcessortTest):
def test(self):
properties = 'P136:genre,P404'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
]
self.assertEqual(result, expected)
def test_with_trail_comma(self):
properties = 'P136:genre,P404,'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
]
self.assertEqual(result, expected)
def test_more_properties(self):
properties = 'P136,P178,P123,P495,P577,P404,P437'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136'),
PropertyConfig(property='P178'),
PropertyConfig(property='P123'),
PropertyConfig(property='P495'),
PropertyConfig(property='P577'),
PropertyConfig(property='P404'),
PropertyConfig(property='P437'),
]
self.assertEqual(result, expected)
def test_with_qualifier(self):
properties = 'P136:genre,P404,P669/P670'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
PropertyConfig(property='P669', qualifier='P670'),
]
self.assertEqual(result, expected)
def test_with_qualifier_and_value(self):
properties = 'P136:genre,P404,P553/Q17459/P670'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
PropertyConfig(property='P404'),
PropertyConfig(property='P553', value='Q17459', qualifier='P670')
]
self.assertEqual(result, expected)
def test_with_label(self):
properties = 'P136:genre,Lbr,P553'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
LabelConfig(language='br'),
PropertyConfig(property='P553')
]
self.assertEqual(result, expected)
def test_with_description(self):
properties = 'P136:genre,Lxy,P553'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P136', title='genre'),
DescriptionConfig(language='xy'),
PropertyConfig(property='P553')
]
self.assertEqual(result, expected)
def test_with_space(self):
properties = 'P131, P17'
result = self.processor.parse_config_properties(properties)
expected = [
PropertyConfig(property='P131'),
PropertyConfig(property='P17')
]
self.assertEqual(result, expected)
+ def test_with_incorrect_syntax(self):
+ properties = 'P131,Something'
+ with self.assertRaises(ConfigException):
+ result = self.processor.parse_config_properties(properties)
+
class TestMain(unittest.TestCase):
def setUp(self):
patcher1 = patch('integraality.pages_processor.PagesProcessor', autospec=True)
self.mock_pages_processor = patcher1.start()
self.addCleanup(patcher1.stop)
patcher2 = patch('argparse.ArgumentParser.parse_args', autospec=True)
self.mock_args = patcher2.start()
self.addCleanup(patcher2.stop)
def test_main_url_argument(self):
url = 'Foo'
self.mock_args.return_value = argparse.Namespace(url=url)
main()
self.mock_pages_processor.assert_called_once_with(url)
self.mock_pages_processor.return_value.process_all.assert_called_once_with()
diff --git a/integraality/tests/test_property_statistics.py b/integraality/tests/test_property_statistics.py
index 144d683..86da426 100644
--- a/integraality/tests/test_property_statistics.py
+++ b/integraality/tests/test_property_statistics.py
@@ -1,950 +1,955 @@
# -*- coding: utf-8 -*-
"""Unit tests for functions.py."""
import unittest
from collections import OrderedDict
from unittest.mock import patch
from property_statistics import (
ColumnConfigMaker,
+ ColumnSyntaxException,
DescriptionConfig,
LabelConfig,
PropertyConfig,
PropertyStatistics,
QueryException
)
class PropertyStatisticsTest(unittest.TestCase):
def setUp(self):
columns = [
PropertyConfig(property='P21'),
PropertyConfig(property='P19'),
PropertyConfig(property='P1', qualifier='P2'),
PropertyConfig(property='P3', value='Q4', qualifier='P5'),
LabelConfig(language='br'),
DescriptionConfig(language='xy'),
]
self.stats = PropertyStatistics(
columns=columns,
selector_sparql=u'wdt:P31 wd:Q41960',
grouping_property=u'P551',
property_threshold=10
)
class TestPropertyConfig(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig('P19')
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{Property|P19}}\n'
self.assertEqual(result, expected)
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity p:P19[]\n"
" })\n"
"}\n"
)
self.assertEqual(result, expected)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity p:P19[]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
self.assertEqual(result, expected)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity p:P19[]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
print(result)
print(expected)
self.assertEqual(result, expected)
class TestPropertyConfigWithTitle(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig('P19', title="birth")
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|[[Property:P19|birth]]\n'
self.assertEqual(result, expected)
class TestPropertyConfigWithQualifier(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig('P669', qualifier='P670')
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{Property|P670}}\n'
self.assertEqual(result, expected)
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n"
" })\n"
"}\n"
)
self.assertEqual(result, expected)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
self.assertEqual(result, expected)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity p:P669 [ ps:P669 [] ; pq:P670 [] ]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
print(result)
print(expected)
self.assertEqual(result, expected)
class TestPropertyConfigWithQualifierAndLabel(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig('P669', title="street", qualifier='P670')
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|[[Property:P670|street]]\n'
self.assertEqual(result, expected)
class TestPropertyConfigWithQualifierAndValue(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig(property='P3', value='Q4', qualifier='P5')
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{Property|P5}}\n'
self.assertEqual(result, expected)
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n"
" })\n"
"}\n"
)
self.assertEqual(result, expected)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
expected = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
print(result)
print(expected)
self.assertEqual(result, expected)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity p:P3 [ ps:P3 Q4 ; pq:P5 [] ]\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
print(result)
print(expected)
self.assertEqual(result, expected)
class TestPropertyConfigWithQualifierAndValueAndTitle(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = PropertyConfig(property='P3', title="Some property", value='Q4', qualifier='P5')
def test_make_column_header(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|[[Property:P5|Some property]]\n'
self.assertEqual(result, expected)
class TestColumnConfigMaker(unittest.TestCase):
def test_property_without_title(self):
result = ColumnConfigMaker.make('P136', None)
expected = PropertyConfig(property='P136')
self.assertEqual(result, expected)
def test_property_with_title(self):
result = ColumnConfigMaker.make('P136', 'genre')
expected = PropertyConfig(property='P136', title='genre')
self.assertEqual(result, expected)
def test_property_with_qualifier(self):
key = 'P669/P670'
result = ColumnConfigMaker.make(key, None)
expected = PropertyConfig(property='P669', qualifier='P670')
self.assertEqual(result, expected)
def test_property_with_qualifier_and_title(self):
key = 'P669/P670'
result = ColumnConfigMaker.make(key, 'street number')
expected = PropertyConfig(property='P669', qualifier='P670', title="street number")
self.assertEqual(result, expected)
def test_property_with_qualifier_and_value(self):
key = 'P553/Q17459/P670'
result = ColumnConfigMaker.make(key, None)
expected = PropertyConfig(property='P553', value='Q17459', qualifier='P670')
self.assertEqual(result, expected)
def test_property_with_qualifier_and_value_and_title(self):
key = 'P553/Q17459/P670'
result = ColumnConfigMaker.make(key, 'street number')
expected = PropertyConfig(property='P553', value='Q17459', qualifier='P670', title='street number')
self.assertEqual(result, expected)
def test_label(self):
result = ColumnConfigMaker.make('Lxy', None)
expected = LabelConfig(language='xy')
self.assertEqual(result, expected)
def test_description(self):
result = ColumnConfigMaker.make('Dxy', None)
expected = DescriptionConfig(language='xy')
self.assertEqual(result, expected)
+ def test_unknown_syntax(self):
+ with self.assertRaises(ColumnSyntaxException):
+ ColumnConfigMaker.make('SomethingSomething', None)
+
class SparqlQueryTest(unittest.TestCase):
def setUp(self):
super().setUp()
patcher = patch('pywikibot.data.sparql.SparqlQuery', autospec=True)
self.mock_sparql_query = patcher.start()
self.addCleanup(patcher.stop)
def assert_query_called(self, query):
self.mock_sparql_query.return_value.select.assert_called_once_with(query)
class TestLabelConfig(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = LabelConfig('br')
def test_simple(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{#language:br}}\n'
self.assertEqual(result, expected)
def test_get_key(self):
result = self.column.get_key()
self.assertEqual(result, 'Lbr')
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, query)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity rdfs:label ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
print(result)
print(query)
self.assertEqual(result, query)
class TestDescriptionConfig(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.column = DescriptionConfig('br')
def test_simple(self):
result = self.column.make_column_header()
expected = u'! data-sort-type="number"|{{#language:br}}\n'
self.assertEqual(result, expected)
def test_get_key(self):
result = self.column.get_key()
self.assertEqual(result, 'Dbr')
def test_get_totals_query(self):
result = self.column.get_totals_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
)
self.assertEqual(result, query)
def test_get_info_query(self):
result = self.column.get_info_query(self.stats)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"HAVING (?count >= 10)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, query)
def test_get_info_no_grouping_query(self):
result = self.column.get_info_no_grouping_query(self.stats)
query = (
"\n"
"SELECT (COUNT(*) AS ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
" FILTER(EXISTS {\n"
" ?entity schema:description ?lang_label.\n"
" FILTER((LANG(?lang_label)) = 'br').\n"
" })\n"
"}\n"
"GROUP BY ?grouping\n"
"ORDER BY DESC (?count)\n"
"LIMIT 10\n"
)
self.assertEqual(result, query)
class FormatHigherGroupingTextTest(SparqlQueryTest, PropertyStatisticsTest):
def test_format_higher_grouping_text_default_qitem(self):
result = self.stats.format_higher_grouping_text("Q1")
expected = '| data-sort-value="Q1"| {{Q|Q1}}\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_string(self):
result = self.stats.format_higher_grouping_text("foo")
expected = '| data-sort-value="foo"| foo\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_country(self):
self.stats.higher_grouping_type = "country"
result = self.stats.format_higher_grouping_text("AT")
expected = '| data-sort-value="AT"| {{Flag|AT}}\n'
self.assertEqual(result, expected)
def test_format_higher_grouping_text_image(self):
text = "http://commons.wikimedia.org/wiki/Special:FilePath/US%20CDC%20logo.svg"
result = self.stats.format_higher_grouping_text(text)
expected = '| data-sort-value="US%20CDC%20logo.svg"| [[File:US%20CDC%20logo.svg|center|100px]]\n'
self.assertEqual(result, expected)
class MakeStatsForNoGroupTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
patcher1 = patch('property_statistics.PropertyStatistics.get_totals_no_grouping', autospec=True)
self.mock_get_totals_no_grouping = patcher1.start()
self.addCleanup(patcher1.stop)
self.mock_get_totals_no_grouping.return_value = 20
self.mock_sparql_query.return_value.select.side_effect = [
[{'count': '2'}],
[{'count': '10'}],
[{'count': '15'}],
[{'count': '5'}],
[{'count': '4'}],
[{'count': '8'}],
]
def test_make_stats_for_no_group(self):
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"| No grouping \n"
"| 20 \n"
"| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n"
"| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n"
"| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n"
"| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.assertEqual(self.mock_sparql_query.call_count, 6)
def test_make_stats_for_no_group_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.make_stats_for_no_group()
expected = (
"|-\n"
"|\n"
"| No grouping \n"
"| 20 \n"
"| {{Integraality cell|10.0|2|column=P21|grouping=None}}\n"
"| {{Integraality cell|50.0|10|column=P19|grouping=None}}\n"
"| {{Integraality cell|75.0|15|column=P1/P2|grouping=None}}\n"
"| {{Integraality cell|25.0|5|column=P3/Q4/P5|grouping=None}}\n"
"| {{Integraality cell|20.0|4|column=Lbr|grouping=None}}\n"
"| {{Integraality cell|40.0|8|column=Dxy|grouping=None}}\n"
)
self.assertEqual(result, expected)
self.mock_get_totals_no_grouping.assert_called_once_with(self.stats)
self.assertEqual(self.mock_sparql_query.call_count, 6)
class MakeStatsForOneGroupingTest(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.stats.column_data = {
'P21': OrderedDict([('Q3115846', 10), ('Q5087901', 6)]),
'P19': OrderedDict([('Q3115846', 8), ('Q2166574', 5)]),
'P1P2': OrderedDict([('Q3115846', 2), ('Q2166574', 9)]),
'P3Q4P5': OrderedDict([('Q3115846', 7), ('Q2166574', 1)]),
'Lbr': OrderedDict([('Q3115846', 1), ('Q2166574', 2)]),
'Dxy': OrderedDict([('Q3115846', 2), ('Q2166574', 1)]),
}
def test_make_stats_for_one_grouping(self):
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
def test_make_stats_for_one_grouping_with_higher_grouping(self):
self.stats.higher_grouping = "wdt:P17/wdt:P298"
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, "Q1")
expected = (
'|-\n'
'| data-sort-value="Q1"| {{Q|Q1}}\n'
'| {{Q|Q3115846}}\n'
'| 10 \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
@patch('pywikibot.ItemPage', autospec=True)
def test_make_stats_for_one_grouping_with_grouping_link(self, mock_item_page):
mock_item_page.return_value.labels = {'en': 'Bar'}
self.stats.grouping_link = "Foo"
result = self.stats.make_stats_for_one_grouping("Q3115846", 10, None)
expected = (
'|-\n'
'| {{Q|Q3115846}}\n'
'| [[Foo/Bar|10]] \n'
'| {{Integraality cell|100.0|10|column=P21|grouping=Q3115846}}\n'
'| {{Integraality cell|80.0|8|column=P19|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=P1/P2|grouping=Q3115846}}\n'
'| {{Integraality cell|70.0|7|column=P3/Q4/P5|grouping=Q3115846}}\n'
'| {{Integraality cell|10.0|1|column=Lbr|grouping=Q3115846}}\n'
'| {{Integraality cell|20.0|2|column=Dxy|grouping=Q3115846}}\n'
)
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyPositive(PropertyStatisticsTest):
def test_get_query_for_items_for_property_positive(self):
result = self.stats.get_query_for_items_for_property_positive('P21', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_no_grouping(self):
result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
?entity wdt:P551 [] .
}
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_totals(self):
result = self.stats.get_query_for_items_for_property_positive('P21', self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity p:P21 ?prop . OPTIONAL { ?prop ps:P21 ?value }
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_positive_label(self):
result = self.stats.get_query_for_items_for_property_positive('Lbr', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
FILTER(EXISTS {
?entity rdfs:label ?lang_label.
FILTER((LANG(?lang_label)) = "br").
})
SERVICE wikibase:label { bd:serviceParam wikibase:language "br". }
}
"""
self.assertEqual(result, expected)
class GetQueryForItemsForPropertyNegative(PropertyStatisticsTest):
def test_get_query_for_items_for_property_negative(self):
result = self.stats.get_query_for_items_for_property_negative('P21', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_no_grouping(self):
result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.NO_GROUPING)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity wdt:P551 [] .} UNION
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_totals(self):
result = self.stats.get_query_for_items_for_property_negative('P21', self.stats.GROUP_MAPPING.TOTALS)
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
MINUS {
{?entity a wdno:P21 .} UNION
{?entity wdt:P21 ?prop .}
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
def test_get_query_for_items_for_property_negative_label(self):
result = self.stats.get_query_for_items_for_property_negative('Lbr', 'Q3115846')
expected = """
SELECT DISTINCT ?entity ?entityLabel WHERE {
?entity wdt:P31 wd:Q41960 .
?entity wdt:P551 wd:Q3115846 .
MINUS {
{ ?entity rdfs:label ?lang_label.
FILTER((LANG(?lang_label)) = "br") }
}
SERVICE wikibase:label { bd:serviceParam wikibase:language "[AUTO_LANGUAGE],en". }
}
"""
self.assertEqual(result, expected)
class GetCountFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
result = self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, 18)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
result = self.stats._get_count_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, None)
class GetGroupingCountsFromSparqlTest(SparqlQueryTest, PropertyStatisticsTest):
def test_return_count(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q1', 'count': 10},
{'grouping': 'http://www.wikidata.org/entity/Q2', 'count': 5},
]
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
expected = OrderedDict([('Q1', 10), ('Q2', 5)])
self.assertEqual(result, expected)
def test_return_None(self):
self.mock_sparql_query.return_value.select.return_value = None
result = self.stats._get_grouping_counts_from_sparql("SELECT X")
self.assert_query_called("SELECT X")
self.assertEqual(result, None)
class SparqlCountTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.return_value = [{'count': '18'}]
def test_get_totals_no_grouping(self):
result = self.stats.get_totals_no_grouping()
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
" MINUS { ?entity wdt:P551 _:b28. }\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
def test_get_totals(self):
result = self.stats.get_totals()
query = (
"\n"
"SELECT (COUNT(*) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960\n"
"}\n"
)
self.assert_query_called(query)
self.assertEqual(result, 18)
class GetGroupingInformationTest(SparqlQueryTest, PropertyStatisticsTest):
def test_get_grouping_information(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict()
)
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_grouping_threshold(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict()
)
self.stats.grouping_threshold = 5
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 5)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_with_higher_grouping(self):
self.mock_sparql_query.return_value.select.return_value = [
{'grouping': 'http://www.wikidata.org/entity/Q3115846', 'higher_grouping': 'NZL', 'count': '10'},
{'grouping': 'http://www.wikidata.org/entity/Q5087901', 'higher_grouping': 'USA', 'count': '6'},
{'grouping': 'http://www.wikidata.org/entity/Q623333', 'higher_grouping': 'USA', 'count': '6'}
]
expected = (
OrderedDict([('Q3115846', 10), ('Q5087901', 6), ('Q623333', 6)]),
OrderedDict([('Q3115846', 'NZL'), ('Q5087901', 'USA'), ('Q623333', 'USA')])
)
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
query = (
"\n"
"SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) "
"(COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
" OPTIONAL { ?grouping wdt:P17/wdt:P298 ?_higher_grouping }.\n"
"} GROUP BY ?grouping ?higher_grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
result = self.stats.get_grouping_information()
self.assert_query_called(query)
self.assertEqual(result, expected)
def test_get_grouping_information_empty_result(self):
self.mock_sparql_query.return_value.select.return_value = None
query = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT *) as ?count) WHERE {\n"
" ?entity wdt:P31 wd:Q41960 .\n"
" ?entity wdt:P551 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
with self.assertRaises(QueryException):
self.stats.get_grouping_information()
self.assert_query_called(query)
class TestGetHeader(PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.stats.grouping_threshold = 7
self.stats.property_threshold = 4
def test_get_header(self):
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="2" |Top groupings (Minimum 7 items)\n'
'! colspan="6"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
)
self.assertEqual(result, expected)
def test_get_header_with_higher_grouping(self):
self.stats.higher_grouping = 'wdt:P17/wdt:P298'
result = self.stats.get_header()
expected = (
'{| class="wikitable sortable"\n'
'! colspan="3" |Top groupings (Minimum 7 items)\n'
'! colspan="6"|Top Properties (used at least 4 times per grouping)\n'
'|-\n'
'! \n'
'! Name\n'
'! Count\n'
'! data-sort-type="number"|{{Property|P21}}\n'
'! data-sort-type="number"|{{Property|P19}}\n'
'! data-sort-type="number"|{{Property|P2}}\n'
'! data-sort-type="number"|{{Property|P5}}\n'
'! data-sort-type="number"|{{#language:br}}\n'
'! data-sort-type="number"|{{#language:xy}}\n'
)
self.assertEqual(result, expected)
class MakeFooterTest(SparqlQueryTest, PropertyStatisticsTest):
def setUp(self):
super().setUp()
self.mock_sparql_query.return_value.select.side_effect = [
[{'count': '120'}],
[{'count': '30'}],
[{'count': '80'}],
[{'count': '10'}],
[{'count': '12'}],
[{'count': '24'}],
[{'count': '36'}],
]
def test_make_footer(self):
result = self.stats.make_footer()
expected = (
'|- class="sortbottom"\n'
"|\'\'\'Totals\'\'\' (all items):\n"
"| 120\n"
"| {{Integraality cell|25.0|30|column=P21}}\n"
"| {{Integraality cell|66.67|80|column=P19}}\n"
"| {{Integraality cell|8.33|10|column=P1/P2}}\n"
"| {{Integraality cell|10.0|12|column=P3/Q4/P5}}\n"
"| {{Integraality cell|20.0|24|column=Lbr}}\n"
"| {{Integraality cell|30.0|36|column=Dxy}}\n"
"|}\n"
)
self.assertEqual(result, expected)