diff --git a/integraality/grouping.py b/integraality/grouping.py index fb1de64..7256662 100644 --- a/integraality/grouping.py +++ b/integraality/grouping.py @@ -1,225 +1,231 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Grouping configuration classes """ import collections import re import pywikibot.data.sparql from line import ItemGrouping, UnknownValueGrouping, YearGrouping from sparql_utils import UNKNOWN_VALUE_PREFIX, QueryException class GroupingConfigurationSyntaxException(Exception): pass class UnsupportedGroupingConfigurationException(Exception): pass class GroupingConfigurationMaker: @staticmethod def make(repo, grouping_property, higher_grouping, grouping_threshold): - if re.match(r"P\d+", grouping_property): + if re.match(r"^P\d+$", grouping_property): property_page = pywikibot.PropertyPage(repo, grouping_property) property_type = property_page.get_data_for_new_entity()["datatype"] print(f"property_type is {property_type}") if property_type == "wikibase-item": return ItemGroupingConfiguration( property=grouping_property, higher_grouping=higher_grouping, grouping_threshold=grouping_threshold, ) elif property_type == "time": return YearGroupingConfiguration( property=grouping_property, grouping_threshold=grouping_threshold ) else: raise UnsupportedGroupingConfigurationException( f"Property {grouping_property} is of type {property_type} which is not supported." ) + elif re.match(r"^P\d+", grouping_property): + return PredicateGroupingConfiguration( + predicate=f"wdt:{grouping_property}", + higher_grouping=higher_grouping, + grouping_threshold=grouping_threshold, + ) else: return PredicateGroupingConfiguration( predicate=grouping_property, higher_grouping=higher_grouping, grouping_threshold=grouping_threshold, ) class AbstractGroupingConfiguration: def __init__(self, higher_grouping=None, grouping_threshold=0): self.higher_grouping = higher_grouping self.grouping_threshold = grouping_threshold def get_grouping_information_query(self, selector_sparql): query = [] query.extend( [ "\n" f"SELECT ?grouping {self.get_select_for_higher_grouping()}(COUNT(DISTINCT ?entity) as ?count) WHERE {{", f" ?entity {selector_sparql} .", ] ) query.extend(self.get_grouping_selector()) query.extend(self.get_higher_grouping_selector()) query.extend( [ f"HAVING (?count >= {self.grouping_threshold})", "ORDER BY DESC(?count)", "LIMIT 1000", "", ] ) return "\n".join(query) def get_select_for_higher_grouping(self): if self.higher_grouping: return "(SAMPLE(?_higher_grouping) as ?higher_grouping) " else: return "" def get_higher_grouping_selector(self): if self.higher_grouping: return [ f" OPTIONAL {{ ?grouping {self.higher_grouping} ?_higher_grouping }}.", "} GROUP BY ?grouping ?higher_grouping", ] else: return ["} GROUP BY ?grouping"] def get_grouping_selector(self): raise NotImplementedError def get_grouping_information(self, selector_sparql): """ Get all groupings and their counts. :return: List of Grouping objects """ query = self.get_grouping_information_query(selector_sparql) groupings = collections.OrderedDict() try: sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: raise QueryException( "No result when querying groupings." "Please investigate the 'all groupings' debug query in the dashboard header.", query=query, ) except pywikibot.exceptions.TimeoutError: raise QueryException( "The Wikidata Query Service timed out when fetching groupings." "You might be trying to do something too expensive." "Please investigate the 'all groupings' debug query in the dashboard header.", query=query, ) unknown_value_count = 0 for resultitem in queryresult: if not resultitem.get("grouping") or resultitem.get("grouping").startswith( UNKNOWN_VALUE_PREFIX ): unknown_value_count += int(resultitem.get("count")) else: qid = resultitem.get("grouping").replace( "http://www.wikidata.org/entity/", "" ) if self.higher_grouping: value = resultitem.get("higher_grouping") if value: value = value.replace("http://www.wikidata.org/entity/", "") else: value = "" higher_grouping = value else: higher_grouping = None property_grouping = self.line_type( title=qid, count=int(resultitem.get("count")), higher_grouping=higher_grouping, ) groupings[property_grouping.get_key()] = property_grouping if unknown_value_count: unknown_value_grouping = UnknownValueGrouping(unknown_value_count) groupings[unknown_value_grouping.get_key()] = unknown_value_grouping return groupings class PredicateGroupingConfiguration(AbstractGroupingConfiguration): line_type = ItemGrouping def __init__(self, predicate, higher_grouping=None, grouping_threshold=20): super().__init__(higher_grouping=higher_grouping, grouping_threshold=grouping_threshold) self.predicate = predicate def __eq__(self, other): return ( self.predicate == other.predicate and self.higher_grouping == other.higher_grouping and self.grouping_threshold == other.grouping_threshold ) def get_predicate(self): return self.predicate def get_grouping_selector(self): return [f" ?entity {self.get_predicate()} ?grouping ."] def format_predicate_html(self): return f"{self.predicate}" class PropertyGroupingConfiguration(AbstractGroupingConfiguration): def __init__(self, property, higher_grouping=None, grouping_threshold=20): super().__init__(higher_grouping=higher_grouping, grouping_threshold=grouping_threshold) self.property = property def __eq__(self, other): return ( self.property == other.property and self.higher_grouping == other.higher_grouping and self.grouping_threshold == other.grouping_threshold ) def get_predicate(self): return f"wdt:{self.property}" def format_predicate_html(self): return f'{self.property}' class ItemGroupingConfiguration(PropertyGroupingConfiguration): line_type = ItemGrouping def __init__(self, property, higher_grouping=None, grouping_threshold=20): super().__init__(property=property, higher_grouping=higher_grouping, grouping_threshold=grouping_threshold) def get_grouping_selector(self): return [f" ?entity {self.get_predicate()} ?grouping ."] class YearGroupingConfiguration(PropertyGroupingConfiguration): line_type = YearGrouping def __init__(self, property, grouping_threshold=20): super().__init__(property=property, grouping_threshold=grouping_threshold) def get_grouping_selector(self): return [ f" ?entity {self.get_predicate()} ?date .", " BIND(YEAR(?date) as ?grouping) .", ] diff --git a/integraality/tests/test_grouping.py b/integraality/tests/test_grouping.py index c6b3395..7bd58a3 100644 --- a/integraality/tests/test_grouping.py +++ b/integraality/tests/test_grouping.py @@ -1,164 +1,175 @@ # -*- coding: utf-8 -*- import unittest from unittest.mock import patch import grouping class AbstractGroupingConfiguration(unittest.TestCase): def test_constructor_empty(self): grouping.AbstractGroupingConfiguration() def test_get_grouping_selector(self): grouping_configuration = grouping.AbstractGroupingConfiguration() with self.assertRaises(NotImplementedError): grouping_configuration.get_grouping_selector() def test_get_grouping_information_query(self): grouping_configuration = grouping.AbstractGroupingConfiguration() with self.assertRaises(NotImplementedError): grouping_configuration.get_grouping_information_query("Q1") class ItemGroupingConfigurationTest(unittest.TestCase): def test_constructor_empty(self): grouping.ItemGroupingConfiguration(property=None) def test_get_grouping_selector(self): grouping_configuration = grouping.ItemGroupingConfiguration(property="P1") result = grouping_configuration.get_grouping_selector() expected = [" ?entity wdt:P1 ?grouping ."] self.assertListEqual(result, expected) def test_get_grouping_information_query(self): grouping_configuration = grouping.ItemGroupingConfiguration(property="P1") result = grouping_configuration.get_grouping_information_query("Q1") expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity Q1 .\n" " ?entity wdt:P1 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, expected) def test_get_grouping_information_query_with_threshold(self): grouping_configuration = grouping.ItemGroupingConfiguration( property="P1", grouping_threshold=12 ) result = grouping_configuration.get_grouping_information_query("Q1") expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity Q1 .\n" " ?entity wdt:P1 ?grouping .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 12)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, expected) def test_get_grouping_information_query_with_higher_grouping(self): grouping_configuration = grouping.ItemGroupingConfiguration( property="P1", higher_grouping="wdt:P2" ) result = grouping_configuration.get_grouping_information_query("Q1") expected = ( "\n" "SELECT ?grouping (SAMPLE(?_higher_grouping) as ?higher_grouping) (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity Q1 .\n" " ?entity wdt:P1 ?grouping .\n" " OPTIONAL { ?grouping wdt:P2 ?_higher_grouping }.\n" "} GROUP BY ?grouping ?higher_grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, expected) class YearGroupingConfigurationTest(unittest.TestCase): def test_constructor_empty(self): grouping.YearGroupingConfiguration(property=None) def test_get_grouping_selector(self): grouping_configuration = grouping.YearGroupingConfiguration(property="P1") result = grouping_configuration.get_grouping_selector() expected = [" ?entity wdt:P1 ?date .", " BIND(YEAR(?date) as ?grouping) ."] self.assertListEqual(result, expected) def test_get_grouping_information_query(self): grouping_configuration = grouping.YearGroupingConfiguration(property="P1") result = grouping_configuration.get_grouping_information_query("Q1") expected = ( "\n" "SELECT ?grouping (COUNT(DISTINCT ?entity) as ?count) WHERE {\n" " ?entity Q1 .\n" " ?entity wdt:P1 ?date .\n" " BIND(YEAR(?date) as ?grouping) .\n" "} GROUP BY ?grouping\n" "HAVING (?count >= 20)\n" "ORDER BY DESC(?count)\n" "LIMIT 1000\n" ) self.assertEqual(result, expected) class TestGroupingConfigurationMaker(unittest.TestCase): def setUp(self): patcher = patch("pywikibot.PropertyPage", autospec=True) self.mock_property_page = patcher.start() self.addCleanup(patcher.stop) self.higher_grouping = "wdt:P17/wdt:P298" self.grouping_threshold = 5 def test_item_datatype(self): self.mock_property_page.return_value.get_data_for_new_entity.return_value = { "datatype": "wikibase-item" } result = grouping.GroupingConfigurationMaker.make( None, "P136", self.higher_grouping, self.grouping_threshold ) expected = grouping.ItemGroupingConfiguration( property="P136", higher_grouping=self.higher_grouping, grouping_threshold=self.grouping_threshold, ) self.assertEqual(result, expected) def test_time_datatype(self): self.mock_property_page.return_value.get_data_for_new_entity.return_value = { "datatype": "time" } result = grouping.GroupingConfigurationMaker.make( None, "P569", self.higher_grouping, self.grouping_threshold ) expected = grouping.YearGroupingConfiguration( property="P569", grouping_threshold=self.grouping_threshold ) self.assertEqual(result, expected) def test_unsupported_datatype(self): self.mock_property_page.return_value.get_data_for_new_entity.return_value = { "datatype": "string" } with self.assertRaises(grouping.UnsupportedGroupingConfigurationException): grouping.GroupingConfigurationMaker.make( None, "P528", self.higher_grouping, self.grouping_threshold ) def test_non_property_syntax(self): result = grouping.GroupingConfigurationMaker.make( None, "dct:language", self.higher_grouping, self.grouping_threshold ) expected = grouping.PredicateGroupingConfiguration( predicate="dct:language", higher_grouping=self.higher_grouping, grouping_threshold=self.grouping_threshold, ) self.assertEqual(result, expected) + + def test_property_syntax_with_injection(self): + result = grouping.GroupingConfigurationMaker.make( + None, "P131/wdt:P131", self.higher_grouping, self.grouping_threshold + ) + expected = grouping.PredicateGroupingConfiguration( + predicate="wdt:P131/wdt:P131", + higher_grouping=self.higher_grouping, + grouping_threshold=self.grouping_threshold, + ) + self.assertEqual(result, expected)