diff --git a/integraality/grouping.py b/integraality/grouping.py
index fb1de64..7256662 100644
--- a/integraality/grouping.py
+++ b/integraality/grouping.py
@@ -1,225 +1,231 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Grouping configuration classes
"""
import collections
import re
import pywikibot.data.sparql
from line import ItemGrouping, UnknownValueGrouping, YearGrouping
from sparql_utils import UNKNOWN_VALUE_PREFIX, QueryException
class GroupingConfigurationSyntaxException(Exception):
pass
class UnsupportedGroupingConfigurationException(Exception):
pass
class GroupingConfigurationMaker:
@staticmethod
def make(repo, grouping_property, higher_grouping, grouping_threshold):
- if re.match(r"P\d+", grouping_property):
+ if re.match(r"^P\d+$", grouping_property):
property_page = pywikibot.PropertyPage(repo, grouping_property)
property_type = property_page.get_data_for_new_entity()["datatype"]
print(f"property_type is {property_type}")
if property_type == "wikibase-item":
return ItemGroupingConfiguration(
property=grouping_property,
higher_grouping=higher_grouping,
grouping_threshold=grouping_threshold,
)
elif property_type == "time":
return YearGroupingConfiguration(
property=grouping_property, grouping_threshold=grouping_threshold
)
else:
raise UnsupportedGroupingConfigurationException(
f"Property {grouping_property} is of type {property_type} which is not supported."
)
+ elif re.match(r"^P\d+", grouping_property):
+ return PredicateGroupingConfiguration(
+ predicate=f"wdt:{grouping_property}",
+ higher_grouping=higher_grouping,
+ grouping_threshold=grouping_threshold,
+ )
else:
return PredicateGroupingConfiguration(
predicate=grouping_property,
higher_grouping=higher_grouping,
grouping_threshold=grouping_threshold,
)
class AbstractGroupingConfiguration:
def __init__(self, higher_grouping=None, grouping_threshold=0):
self.higher_grouping = higher_grouping
self.grouping_threshold = grouping_threshold
def get_grouping_information_query(self, selector_sparql):
query = []
query.extend(
[
"\n"
f"SELECT ?grouping {self.get_select_for_higher_grouping()}(COUNT(DISTINCT ?entity) as ?count) WHERE {{",
f" ?entity {selector_sparql} .",
]
)
query.extend(self.get_grouping_selector())
query.extend(self.get_higher_grouping_selector())
query.extend(
[
f"HAVING (?count >= {self.grouping_threshold})",
"ORDER BY DESC(?count)",
"LIMIT 1000",
"",
]
)
return "\n".join(query)
def get_select_for_higher_grouping(self):
if self.higher_grouping:
return "(SAMPLE(?_higher_grouping) as ?higher_grouping) "
else:
return ""
def get_higher_grouping_selector(self):
if self.higher_grouping:
return [
f" OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}.",
"} GROUP BY ?grouping ?higher_grouping",
]
else:
return ["} GROUP BY ?grouping"]
def get_grouping_selector(self):
raise NotImplementedError
def get_grouping_information(self, selector_sparql):
"""
Get all groupings and their counts.
:return: List of Grouping objects
"""
query = self.get_grouping_information_query(selector_sparql)
groupings = collections.OrderedDict()
try:
sq = pywikibot.data.sparql.SparqlQuery()
queryresult = sq.select(query)
if not queryresult:
raise QueryException(
"No result when querying groupings."
"Please investigate the 'all groupings' debug query in the dashboard header.",
query=query,
)
except pywikibot.exceptions.TimeoutError:
raise QueryException(
"The Wikidata Query Service timed out when fetching groupings."
"You might be trying to do something too expensive."
"Please investigate the 'all groupings' debug query in the dashboard header.",
query=query,
)
unknown_value_count = 0
for resultitem in queryresult:
if not resultitem.get("grouping") or resultitem.get("grouping").startswith(
UNKNOWN_VALUE_PREFIX
):
unknown_value_count += int(resultitem.get("count"))
else:
qid = resultitem.get("grouping").replace(
"http://www.wikidata.org/entity/", ""
)
if self.higher_grouping:
value = resultitem.get("higher_grouping")
if value:
value = value.replace("http://www.wikidata.org/entity/", "")
else:
value = ""
higher_grouping = value
else:
higher_grouping = None
property_grouping = self.line_type(
title=qid,
count=int(resultitem.get("count")),
higher_grouping=higher_grouping,
)
groupings[property_grouping.get_key()] = property_grouping
if unknown_value_count:
unknown_value_grouping = UnknownValueGrouping(unknown_value_count)
groupings[unknown_value_grouping.get_key()] = unknown_value_grouping
return groupings
class PredicateGroupingConfiguration(AbstractGroupingConfiguration):
line_type = ItemGrouping
def __init__(self, predicate, higher_grouping=None, grouping_threshold=20):
super().__init__(higher_grouping=higher_grouping, grouping_threshold=grouping_threshold)
self.predicate = predicate
def __eq__(self, other):
return (
self.predicate == other.predicate
and self.higher_grouping == other.higher_grouping
and self.grouping_threshold == other.grouping_threshold
)
def get_predicate(self):
return self.predicate
def get_grouping_selector(self):
return [f" ?entity {self.get_predicate()} ?grouping ."]
def format_predicate_html(self):
return f"{self.predicate}"
class PropertyGroupingConfiguration(AbstractGroupingConfiguration):
def __init__(self, property, higher_grouping=None, grouping_threshold=20):
super().__init__(higher_grouping=higher_grouping, grouping_threshold=grouping_threshold)
self.property = property
def __eq__(self, other):
return (
self.property == other.property
and self.higher_grouping == other.higher_grouping
and self.grouping_threshold == other.grouping_threshold
)
def get_predicate(self):
return f"wdt:{self.property}"
def format_predicate_html(self):
return f'{self.property}'
class ItemGroupingConfiguration(PropertyGroupingConfiguration):
line_type = ItemGrouping
def __init__(self, property, higher_grouping=None, grouping_threshold=20):
super().__init__(property=property, higher_grouping=higher_grouping, grouping_threshold=grouping_threshold)
def get_grouping_selector(self):
return [f" ?entity {self.get_predicate()} ?grouping ."]
class YearGroupingConfiguration(PropertyGroupingConfiguration):
line_type = YearGrouping
def __init__(self, property, grouping_threshold=20):
super().__init__(property=property, grouping_threshold=grouping_threshold)
def get_grouping_selector(self):
return [
f" ?entity {self.get_predicate()} ?date .",
" BIND(YEAR(?date) as ?grouping) .",
]
diff --git a/integraality/tests/test_grouping.py b/integraality/tests/test_grouping.py
index c6b3395..7bd58a3 100644
--- a/integraality/tests/test_grouping.py
+++ b/integraality/tests/test_grouping.py
@@ -1,164 +1,175 @@
# -*- coding: utf-8 -*-
import unittest
from unittest.mock import patch
import grouping
class AbstractGroupingConfiguration(unittest.TestCase):
def test_constructor_empty(self):
grouping.AbstractGroupingConfiguration()
def test_get_grouping_selector(self):
grouping_configuration = grouping.AbstractGroupingConfiguration()
with self.assertRaises(NotImplementedError):
grouping_configuration.get_grouping_selector()
def test_get_grouping_information_query(self):
grouping_configuration = grouping.AbstractGroupingConfiguration()
with self.assertRaises(NotImplementedError):
grouping_configuration.get_grouping_information_query("Q1")
class ItemGroupingConfigurationTest(unittest.TestCase):
def test_constructor_empty(self):
grouping.ItemGroupingConfiguration(property=None)
def test_get_grouping_selector(self):
grouping_configuration = grouping.ItemGroupingConfiguration(property="P1")
result = grouping_configuration.get_grouping_selector()
expected = [" ?entity wdt:P1 ?grouping ."]
self.assertListEqual(result, expected)
def test_get_grouping_information_query(self):
grouping_configuration = grouping.ItemGroupingConfiguration(property="P1")
result = grouping_configuration.get_grouping_information_query("Q1")
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity Q1 .\n"
" ?entity wdt:P1 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, expected)
def test_get_grouping_information_query_with_threshold(self):
grouping_configuration = grouping.ItemGroupingConfiguration(
property="P1", grouping_threshold=12
)
result = grouping_configuration.get_grouping_information_query("Q1")
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity Q1 .\n"
" ?entity wdt:P1 ?grouping .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 12)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, expected)
def test_get_grouping_information_query_with_higher_grouping(self):
grouping_configuration = grouping.ItemGroupingConfiguration(
property="P1", higher_grouping="wdt:P2"
)
result = grouping_configuration.get_grouping_information_query("Q1")
expected = (
"\n"
"SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity Q1 .\n"
" ?entity wdt:P1 ?grouping .\n"
" OPTIONAL { ?grouping wdt:P2 ?_higher_grouping }.\n"
"} GROUP BY ?grouping ?higher_grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, expected)
class YearGroupingConfigurationTest(unittest.TestCase):
def test_constructor_empty(self):
grouping.YearGroupingConfiguration(property=None)
def test_get_grouping_selector(self):
grouping_configuration = grouping.YearGroupingConfiguration(property="P1")
result = grouping_configuration.get_grouping_selector()
expected = [" ?entity wdt:P1 ?date .", " BIND(YEAR(?date) as ?grouping) ."]
self.assertListEqual(result, expected)
def test_get_grouping_information_query(self):
grouping_configuration = grouping.YearGroupingConfiguration(property="P1")
result = grouping_configuration.get_grouping_information_query("Q1")
expected = (
"\n"
"SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n"
" ?entity Q1 .\n"
" ?entity wdt:P1 ?date .\n"
" BIND(YEAR(?date) as ?grouping) .\n"
"} GROUP BY ?grouping\n"
"HAVING (?count >= 20)\n"
"ORDER BY DESC(?count)\n"
"LIMIT 1000\n"
)
self.assertEqual(result, expected)
class TestGroupingConfigurationMaker(unittest.TestCase):
def setUp(self):
patcher = patch("pywikibot.PropertyPage", autospec=True)
self.mock_property_page = patcher.start()
self.addCleanup(patcher.stop)
self.higher_grouping = "wdt:P17/wdt:P298"
self.grouping_threshold = 5
def test_item_datatype(self):
self.mock_property_page.return_value.get_data_for_new_entity.return_value = {
"datatype": "wikibase-item"
}
result = grouping.GroupingConfigurationMaker.make(
None, "P136", self.higher_grouping, self.grouping_threshold
)
expected = grouping.ItemGroupingConfiguration(
property="P136",
higher_grouping=self.higher_grouping,
grouping_threshold=self.grouping_threshold,
)
self.assertEqual(result, expected)
def test_time_datatype(self):
self.mock_property_page.return_value.get_data_for_new_entity.return_value = {
"datatype": "time"
}
result = grouping.GroupingConfigurationMaker.make(
None, "P569", self.higher_grouping, self.grouping_threshold
)
expected = grouping.YearGroupingConfiguration(
property="P569", grouping_threshold=self.grouping_threshold
)
self.assertEqual(result, expected)
def test_unsupported_datatype(self):
self.mock_property_page.return_value.get_data_for_new_entity.return_value = {
"datatype": "string"
}
with self.assertRaises(grouping.UnsupportedGroupingConfigurationException):
grouping.GroupingConfigurationMaker.make(
None, "P528", self.higher_grouping, self.grouping_threshold
)
def test_non_property_syntax(self):
result = grouping.GroupingConfigurationMaker.make(
None, "dct:language", self.higher_grouping, self.grouping_threshold
)
expected = grouping.PredicateGroupingConfiguration(
predicate="dct:language",
higher_grouping=self.higher_grouping,
grouping_threshold=self.grouping_threshold,
)
self.assertEqual(result, expected)
+
+ def test_property_syntax_with_injection(self):
+ result = grouping.GroupingConfigurationMaker.make(
+ None, "P131/wdt:P131", self.higher_grouping, self.grouping_threshold
+ )
+ expected = grouping.PredicateGroupingConfiguration(
+ predicate="wdt:P131/wdt:P131",
+ higher_grouping=self.higher_grouping,
+ grouping_threshold=self.grouping_threshold,
+ )
+ self.assertEqual(result, expected)