diff --git a/integraality/line.py b/integraality/line.py index a798f46..479f61f 100644 --- a/integraality/line.py +++ b/integraality/line.py @@ -1,197 +1,228 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Line configuration classes """ import collections import logging import re import pywikibot class AbstractLine: def __init__(self, count, cells=None): self.count = count if not cells: cells = collections.OrderedDict() self.cells = cells def get_percentage(self, value): if not value: return 0 return round(1.0 * value / max(self.count, 1) * 100, 2) class Grouping(AbstractLine): is_linkable = True def __init__(self, count, cells=None, title=None, higher_grouping=None): super().__init__(count, cells) self.title = title self.higher_grouping = higher_grouping def __eq__(self, other): return ( self.count == other.count and self.title == other.title and self.higher_grouping == other.higher_grouping and self.cells == other.cells ) def __repr__(self): cell = ",".join(["%s:%s" % (key, value) for (key, value) in self.cells.items()]) return f"{self.title}:{self.count} - {cell}" def get_key(self): return self.title def format_header_cell(self, grouping_type): text = "" if self.higher_grouping is not None: text += self.format_higher_grouping_text(grouping_type) text += f"| {self.heading()}\n" return text def format_cell(self, column_entry, cell_template): column_count = self.cells.get(column_entry.get_key(), 0) percentage = self.get_percentage(column_count) fields = [ cell_template, str(percentage), str(column_count), f"column={column_entry.get_title()}", f"grouping={self.title}", ] return f'| {{{{{"|".join(fields)}}}}}\n' def row_opener(self): return "|-\n" def format_count_cell(self, grouping_link, repo): if grouping_link and self.is_linkable: return self.format_grouping_link(grouping_link, repo) else: return f"| {self.count} \n" def format_grouping_link(self, grouping_link, repo=None): return f"| [[{grouping_link}/{self.title}|{self.count}]] \n" def postive_query(self, selector_sparql, grouping_property=None, grouping=None): query = [] query.extend( [ "SELECT DISTINCT ?entity ?entityLabel ?value ?valueLabel WHERE {", f" ?entity {selector_sparql} .", ] ) query.extend( self.postive_query_filter_out_fragment(grouping_property, grouping) ) return "\n".join(query) def postive_query_filter_out_fragment(self, grouping_property=None, grouping=None): return [] + def negative_query(self, selector_sparql, grouping_property=None, grouping=None): + query = [] + query.extend( + [ + "SELECT DISTINCT ?entity ?entityLabel WHERE {", + f" ?entity {selector_sparql} .", + ] + ) + query.extend( + self.negative_query_filter_out_fragment(grouping_property, grouping) + ) + return "\n".join(query) + + def negative_query_filter_out_fragment(self, grouping_property=None, grouping=None): + return self.postive_query_filter_out_fragment(grouping_property, grouping) + class NoGroupGrouping(Grouping): """Group for items that do not belong to any group.""" is_linkable = False def heading(self): return "No grouping" def format_higher_grouping_text(self, grouping_type=None): return "|\n" def postive_query_filter_out_fragment(self, grouping_property, grouping=None): return [" MINUS {", f" ?entity wdt:{grouping_property} [] .", " }"] + def negative_query_filter_out_fragment(self, grouping_property, grouping=None): + return [ + " MINUS {", + f" {{?entity wdt:{grouping_property} [] .}} UNION", + ] + class ItemGrouping(Grouping): def format_grouping_link(self, grouping_link, repo): try: group_item = pywikibot.ItemPage(repo, self.title) group_item.get() label = group_item.labels["en"] except ( pywikibot.exceptions.InvalidTitleError, pywikibot.exceptions.NoPageError, KeyError, ): logging.info(f"Could not retrieve label for {self.title}") label = self.title return f"| [[{grouping_link}/{label}|{self.count}]] \n" def format_higher_grouping_text(self, grouping_type): higher_grouping_value = self.higher_grouping type_mapping = { "country": "{{Flag|%s}}" % higher_grouping_value, } if re.match(r"Q\d+", higher_grouping_value): higher_grouping_text = f"{{{{Q|{higher_grouping_value}}}}}" elif re.match( r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value, ): match = re.match( r"http://commons.wikimedia.org/wiki/Special:FilePath/(.*?)$", higher_grouping_value, ) image_name = match.groups()[0] higher_grouping_text = f"[[File:{image_name}|center|100px]]" higher_grouping_value = image_name elif grouping_type in type_mapping: higher_grouping_text = type_mapping.get(grouping_type) else: higher_grouping_text = higher_grouping_value return f'| data-sort-value="{higher_grouping_value}"| {higher_grouping_text}\n' def heading(self): return f"{{{{Q|{self.title}}}}}" def postive_query_filter_out_fragment(self, grouping_property, grouping): return [f" ?entity wdt:{grouping_property} wd:{grouping} ."] + def negative_query_filter_out_fragment(self, grouping_property, grouping): + return self.postive_query_filter_out_fragment(grouping_property, grouping) + class YearGrouping(Grouping): def heading(self): return f"{self.title}" def postive_query_filter_out_fragment(self, grouping_property, grouping): return [ f" ?entity wdt:{grouping_property} ?date.", " BIND(YEAR(?date) as ?year).", f" FILTER(?year = {grouping}).", ] + def negative_query_filter_out_fragment(self, grouping_property, grouping): + return self.postive_query_filter_out_fragment(grouping_property, grouping) + class UnknownValueGrouping(Grouping): def get_key(self): return "UNKNOWN_VALUE" def heading(self): return "{{int:wikibase-snakview-variations-somevalue-label}}" def postive_query_filter_out_fragment(self, grouping_property, grouping=None): return [ f" ?entity wdt:{grouping_property} ?grouping.", " FILTER wikibase:isSomeValue(?grouping).", ] + def negative_query_filter_out_fragment(self, grouping_property, grouping): + return self.postive_query_filter_out_fragment(grouping_property, grouping) + class TotalsGrouping(Grouping): is_linkable = False def heading(self): return "'''Totals''' (all items)" def format_higher_grouping_text(self, grouping_type=None): return "||\n" def row_opener(self): return '|- class="sortbottom"\n' diff --git a/integraality/property_statistics.py b/integraality/property_statistics.py index 2b28037..feb3ef3 100644 --- a/integraality/property_statistics.py +++ b/integraality/property_statistics.py @@ -1,323 +1,311 @@ #!/usr/bin/python # -*- coding: utf-8 -*- """ Calculate and generate statistics """ import collections import logging from enum import Enum import pywikibot import pywikibot.data.sparql from column import ColumnMaker, GroupingType from grouping import ItemGroupingConfiguration from line import (ItemGrouping, NoGroupGrouping, TotalsGrouping, UnknownValueGrouping, YearGrouping) from sparql_utils import UNKNOWN_VALUE_PREFIX, QueryException from statsd.defaults.env import statsd class PropertyStatistics: """ Generate statitics """ GROUP_MAPPING = Enum( "GROUP_MAPPING", { "NO_GROUPING": "None", "TOTALS": "", "UNKNOWN_VALUE": "{{int:wikibase-snakview-variations-somevalue-label}}", }, ) def __init__( self, selector_sparql, columns, grouping_configuration, grouping_type=None, higher_grouping_type=None, stats_for_no_group=False, grouping_link=None, property_threshold=0, ): """ Set what to work on and other variables here. """ site = pywikibot.Site("en", "wikipedia") self.repo = site.data_repository() self.columns = {column.get_key(): column for column in columns} self.grouping_configuration = grouping_configuration if grouping_type: self.grouping_type = GroupingType(grouping_type) else: self.grouping_type = None self.higher_grouping_type = higher_grouping_type self.selector_sparql = selector_sparql self.stats_for_no_group = stats_for_no_group self.property_threshold = property_threshold self.grouping_link = grouping_link self.cell_template = "Integraality cell" @statsd.timer("property_statistics.sparql.groupings") def get_grouping_information(self): """ Get all groupings and their counts. :return: List of Grouping objects """ return self.grouping_configuration.get_grouping_information(self.selector_sparql) def get_query_for_items_for_property_positive(self, column, grouping): column_key = column.get_key() grouping_property = self.grouping_configuration.property if grouping == self.GROUP_MAPPING.TOTALS: line = TotalsGrouping(None) elif grouping == self.GROUP_MAPPING.NO_GROUPING: line = NoGroupGrouping(None) elif grouping == self.GROUP_MAPPING.UNKNOWN_VALUE: line = UnknownValueGrouping(None) elif self.grouping_type == GroupingType.YEAR: line = YearGrouping(None) else: line = ItemGrouping(None) query = "\n" query += line.postive_query(self.selector_sparql, grouping_property, grouping) query += column.get_filter_for_positive_query() query += """} """ return query def get_query_for_items_for_property_negative(self, column, grouping): column_key = column.get_key() grouping_property = self.grouping_configuration.property - query = f""" -SELECT DISTINCT ?entity ?entityLabel WHERE {{ - ?entity {self.selector_sparql} .""" if grouping == self.GROUP_MAPPING.TOTALS: - query += """ - MINUS {""" - + line = TotalsGrouping(None) elif grouping == self.GROUP_MAPPING.NO_GROUPING: - query += f""" - MINUS {{ - {{?entity wdt:{grouping_property} [] .}} UNION""" - + line = NoGroupGrouping(None) elif grouping == self.GROUP_MAPPING.UNKNOWN_VALUE: - query += f""" - ?entity wdt:{grouping_property} ?grouping. - FILTER wikibase:isSomeValue(?grouping). - MINUS {{""" - + line = UnknownValueGrouping(None) elif self.grouping_type == GroupingType.YEAR: - query += f""" - ?entity wdt:{grouping_property} ?date. - BIND(YEAR(?date) as ?year). - FILTER(?year = {grouping}). - MINUS {{""" - + line = YearGrouping(None) else: - query += f""" - ?entity wdt:{grouping_property} wd:{grouping} . - MINUS {{""" + line = ItemGrouping(None) + + query = "\n" + query += line.negative_query(self.selector_sparql, grouping_property, grouping) + + if grouping != self.GROUP_MAPPING.NO_GROUPING: + query += """ + MINUS {""" query += column.get_filter_for_negative_query() query += """} """ return query def get_totals_no_grouping(self): grouping_property = self.grouping_configuration.property query = f""" SELECT (COUNT(*) as ?count) WHERE {{ ?entity {self.selector_sparql} MINUS {{ ?entity wdt:{grouping_property} _:b28. }} }} """ return self._get_count_from_sparql(query) def get_totals(self): query = f""" SELECT (COUNT(*) as ?count) WHERE {{ ?entity {self.selector_sparql} }} """ return self._get_count_from_sparql(query) @staticmethod @statsd.timer("property_statistics.sparql.count") def _get_count_from_sparql(query): sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: raise QueryException("No result when running a SPARQL query.", query=query) return int(queryresult[0].get("count")) @statsd.timer("property_statistics.sparql.grouping_counts") def _get_grouping_counts_from_sparql(self, query): result = collections.OrderedDict() sq = pywikibot.data.sparql.SparqlQuery() queryresult = sq.select(query) if not queryresult: return None for resultitem in queryresult: if not resultitem.get("grouping") or resultitem.get("grouping").startswith( UNKNOWN_VALUE_PREFIX ): if self.GROUP_MAPPING.UNKNOWN_VALUE.name not in result.keys(): result[self.GROUP_MAPPING.UNKNOWN_VALUE.name] = 0 result[self.GROUP_MAPPING.UNKNOWN_VALUE.name] += int( resultitem.get("count") ) else: qid = resultitem.get("grouping").replace( "http://www.wikidata.org/entity/", "" ) result[qid] = int(resultitem.get("count")) return result def get_header(self): text = '{| class="wikitable sortable"\n' colspan = 3 if self.grouping_configuration.higher_grouping else 2 text += f'! colspan="{colspan}" |Top groupings (Minimum {self.grouping_configuration.grouping_threshold} items)\n' text += f'! colspan="{len(self.columns)}"|Top Properties (used at least {self.property_threshold} times per grouping)\n' # noqa text += "|-\n" if self.grouping_configuration.higher_grouping: text += "! \n" text += "! Name\n" text += "! Count\n" for column_entry in self.columns.values(): text += column_entry.make_column_header() return text def make_stats_for_no_group(self): """ Query the data for no_group, return the wikitext """ count = self.get_totals_no_grouping() grouping_object = NoGroupGrouping( count=count, higher_grouping=self.grouping_configuration.higher_grouping ) for column_entry_key, column_entry in self.columns.items(): value = self._get_count_from_sparql( column_entry.get_info_no_grouping_query(self) ) grouping_object.cells[column_entry_key] = value return self.format_stats_for_one_grouping(grouping_object) def format_stats_for_one_grouping(self, grouping_object): """ Query the data for one group, return the wikitext. """ text = grouping_object.row_opener() text += grouping_object.format_header_cell(self.grouping_type) text += grouping_object.format_count_cell(self.grouping_link, self.repo) for column_entry in self.columns.values(): text += grouping_object.format_cell(column_entry, self.cell_template) return text def make_totals(self): count = self.get_totals() grouping_object = TotalsGrouping( count=count, title="", higher_grouping=self.grouping_configuration.higher_grouping ) for column_entry_key, column_entry in self.columns.items(): value = self._get_count_from_sparql(column_entry.get_totals_query(self)) grouping_object.cells[column_entry_key] = value return self.format_stats_for_one_grouping(grouping_object) @statsd.timer("property_statistics.processing") def retrieve_and_process_data(self): """ Query the data, output wikitext """ groupings = self.retrieve_data() text = self.process_data(groupings) return text def populate_groupings(self, groupings): for column_entry_key, column_entry in self.columns.items(): data = self._get_grouping_counts_from_sparql( column_entry.get_info_query(self) ) if not data: continue for grouping_item, value in data.items(): grouping = groupings.get(grouping_item) if grouping: grouping.cells[column_entry_key] = value else: logging.debug( f"Discarding data on {grouping_item}, not in the groupings" ) return groupings def retrieve_data(self): logging.info("Retrieving grouping information...") try: groupings = self.get_grouping_information() except QueryException as e: logging.error("No groupings found.") raise e logging.info(f"Grouping retrieved: {len(groupings)}") groupings = self.populate_groupings(groupings) return groupings def process_data(self, groupings): text = self.get_header() for grouping in sorted(groupings.values(), key=lambda t: t.count, reverse=True): text += self.format_stats_for_one_grouping(grouping) if self.stats_for_no_group: text += self.make_stats_for_no_group() text += self.make_totals() text += "|}\n" return text def main(*args): """ Main function. """ columns = [ ColumnMaker.make("P21", None), ColumnMaker.make("P19", None), ColumnMaker.make("Lde", None), ColumnMaker.make("Dde", None), ] logging.info("Main function...") stats = PropertyStatistics( columns=columns, selector_sparql="wdt:P10241 wd:Q41960", grouping_configuration=ItemGroupingConfiguration(property="P551", grouping_threshold=5), stats_for_no_group=True, property_threshold=1, ) print(stats.retrieve_and_process_data()) if __name__ == "__main__": main()