diff --git a/app.py b/app.py index 0a8f0b6..39ffb1d 100644 --- a/app.py +++ b/app.py @@ -1,377 +1,377 @@ # -*- coding: utf-8 -*- import flask import json import mwapi # type: ignore import mwoauth # type: ignore import os import random import requests import requests_oauthlib # type: ignore import string import toolforge from typing import List, Optional, Tuple, Union import werkzeug import yaml from converters import EntityIdConverter, PropertyIdConverter, \ RankConverter, WikiConverter import wbformat app = flask.Flask(__name__) user_agent = toolforge.set_user_agent( 'ranker', email='ranker@lucaswerkmeister.de') __dir__ = os.path.dirname(__file__) try: with open(os.path.join(__dir__, 'config.yaml')) as config_file: app.config.update(yaml.safe_load(config_file)) except FileNotFoundError: print('config.yaml file not found, assuming local development setup') characters = string.ascii_letters + string.digits random_string = ''.join(random.choice(characters) for _ in range(64)) app.secret_key = random_string if 'oauth' in app.config: oauth_config = app.config['oauth'] consumer_token = mwoauth.ConsumerToken(oauth_config['consumer_key'], oauth_config['consumer_secret']) index_php = 'https://www.wikidata.org/w/index.php' app.url_map.converters['eid'] = EntityIdConverter app.url_map.converters['pid'] = PropertyIdConverter app.url_map.converters['rank'] = RankConverter app.url_map.converters['wiki'] = WikiConverter @app.template_global() def csrf_token() -> str: if 'csrf_token' not in flask.session: characters = string.ascii_letters + string.digits random_string = ''.join(random.choice(characters) for _ in range(64)) flask.session['csrf_token'] = random_string return flask.session['csrf_token'] @app.template_global() def form_value(name: str) -> flask.Markup: if 'repeat_form' in flask.g and name in flask.request.form: return (flask.Markup(r' value="') + flask.Markup.escape(flask.request.form[name]) + flask.Markup(r'" ')) else: return flask.Markup() @app.template_global() def form_attributes(name: str) -> flask.Markup: return (flask.Markup(r' id="') + flask.Markup.escape(name) + flask.Markup(r'" name="') + flask.Markup.escape(name) + flask.Markup(r'" ') + form_value(name)) @app.template_filter() def user_link(user_name: str) -> flask.Markup: user_href = 'https://www.wikidata.org/wiki/User:' return (flask.Markup(r'') + flask.Markup(r'') + flask.Markup.escape(user_name) + flask.Markup(r'') + flask.Markup(r'')) @app.template_global() def authentication_area() -> flask.Markup: if 'oauth' not in app.config: return flask.Markup() if 'oauth_access_token' not in flask.session: return (flask.Markup(r'Log in')) access_token = mwoauth.AccessToken(**flask.session['oauth_access_token']) identity = mwoauth.identify(index_php, consumer_token, access_token) return (flask.Markup(r'Logged in as ') + user_link(identity['username']) + flask.Markup(r'')) @app.template_global() def can_edit() -> bool: if 'oauth' not in app.config: return True return 'oauth_access_token' in flask.session @app.template_global() def format_value(wiki: str, property_id: str, value: dict) -> flask.Markup: return wbformat.format_value(anonymous_session(wiki), property_id, value) @app.template_global() def format_property(wiki: str, property_id: str) -> flask.Markup: return wbformat.format_property(anonymous_session(wiki), property_id) def anonymous_session(wiki: str) -> mwapi.Session: return mwapi.Session('https://' + wiki, user_agent=user_agent) def authenticated_session(wiki: str) -> Optional[mwapi.Session]: if 'oauth_access_token' not in flask.session: return None access_token = mwoauth.AccessToken( **flask.session['oauth_access_token']) auth = requests_oauthlib.OAuth1(client_key=consumer_token.key, client_secret=consumer_token.secret, resource_owner_key=access_token.key, resource_owner_secret=access_token.secret) return mwapi.Session(host='https://' + wiki, auth=auth, user_agent=user_agent) @app.route('/', methods=['GET', 'POST']) def index() -> Union[str, werkzeug.Response]: if flask.request.method == 'POST': url = flask.url_for('show_edit_form', wiki=flask.request.form['wiki'], entity_id=flask.request.form['entity_id'], property_id=flask.request.form['property_id']) return flask.redirect(url) return flask.render_template('index.html') @app.route('/edit////') def show_edit_form(wiki: str, entity_id: str, property_id: str) -> str: session = anonymous_session(wiki) response = session.get(action='wbgetentities', ids=[entity_id], props=['info', 'claims'], formatversion=2) entity = response['entities'][entity_id] base_revision_id = entity['lastrevid'] statements = entity_statements(entity, property_id) - prefetch_property_ids = set() + prefetch_property_ids = {property_id} for statement in statements: prefetch_property_ids.update(statement.get('qualifiers', {}).keys()) wbformat.prefetch_properties(session, prefetch_property_ids) return flask.render_template('edit.html', wiki=wiki, entity_id=entity_id, property_id=property_id, statements=statements, base_revision_id=base_revision_id) @app.route('/edit////set/', # noqa:E501 methods=['POST']) def edit_set_rank(wiki: str, entity_id: str, property_id: str, rank: str) \ -> Union[werkzeug.Response, Tuple[str, int]]: if not submitted_request_valid(): return 'CSRF error', 400 # TODO better error if 'oauth_access_token' not in flask.session: return 'not logged in', 401 # TODO better error session = authenticated_session(wiki) assert session is not None base_revision_id = flask.request.form['base_revision_id'] response = requests.get(f'https://{wiki}/wiki/Special:EntityData/' f'{entity_id}.json?revision={base_revision_id}') entity = response.json()['entities'][entity_id] statements = entity_statements(entity, property_id) edited_statements = 0 for statement in statements: if statement['id'] in flask.request.form: statement['rank'] = rank edited_statements += 1 edited_entity = build_entity(entity_id, property_id, statements) if edited_statements == 1: summary = f'Set rank of 1 statement to "{rank}"' else: summary = f'Set rank of {edited_statements} statements to "{rank}"' if flask.request.form.get('summary'): summary += ': ' + flask.request.form['summary'] return save_entity_and_redirect(edited_entity, summary, base_revision_id, session) @app.route('/edit////increment', methods=['POST']) def edit_increment_rank(wiki: str, entity_id: str, property_id: str) \ -> Union[werkzeug.Response, Tuple[str, int]]: if not submitted_request_valid(): return 'CSRF error', 400 # TODO better error if 'oauth_access_token' not in flask.session: return 'not logged in', 401 # TODO better error session = authenticated_session(wiki) assert session is not None base_revision_id = flask.request.form['base_revision_id'] response = requests.get(f'https://{wiki}/wiki/Special:EntityData/' f'{entity_id}.json?revision={base_revision_id}') entity = response.json()['entities'][entity_id] statements = entity_statements(entity, property_id) edited_statements = 0 for statement in statements: if statement['id'] in flask.request.form: rank = statement['rank'] incremented_rank = increment_rank(rank) if incremented_rank != rank: statement['rank'] = incremented_rank edited_statements += 1 edited_entity = build_entity(entity_id, property_id, statements) if edited_statements == 1: summary = 'Incremented rank of 1 statement' else: summary = f'Incremented rank of {edited_statements} statements' return save_entity_and_redirect(edited_entity, summary, base_revision_id, session) @app.route('/login') def login() -> werkzeug.Response: redirect, request_token = mwoauth.initiate(index_php, consumer_token, user_agent=user_agent) flask.session['oauth_request_token'] = dict(zip(request_token._fields, request_token)) return flask.redirect(redirect) @app.route('/oauth/callback') def oauth_callback() -> werkzeug.Response: request_token = mwoauth.RequestToken( **flask.session.pop('oauth_request_token')) access_token = mwoauth.complete(index_php, consumer_token, request_token, flask.request.query_string, user_agent=user_agent) flask.session['oauth_access_token'] = dict(zip(access_token._fields, access_token)) flask.session.pop('csrf_token', None) return flask.redirect(flask.url_for('index')) @app.route('/logout') def logout() -> werkzeug.Response: flask.session.pop('oauth_access_token', None) return flask.redirect(flask.url_for('index')) def full_url(endpoint: str, **kwargs) -> str: scheme = flask.request.headers.get('X-Forwarded-Proto', 'http') return flask.url_for(endpoint, _external=True, _scheme=scheme, **kwargs) def submitted_request_valid() -> bool: """Check whether a submitted POST request is valid. If this method returns False, the request might have been issued by an attacker as part of a Cross-Site Request Forgery attack; callers MUST NOT process the request in that case. """ real_token = flask.session.get('csrf_token') submitted_token = flask.request.form.get('csrf_token') if not real_token: # we never expected a POST return False if not submitted_token: # token got lost or attacker did not supply it return False if submitted_token != real_token: # incorrect token (could be outdated or incorrectly forged) return False return True @app.after_request def deny_frame(response: flask.Response) -> flask.Response: """Disallow embedding the tool’s pages in other websites. Not every tool can be usefully embedded in other websites, but allowing embedding can expose the tool to clickjacking vulnerabilities, so err on the side of caution and disallow embedding. This can be removed (possibly only for certain pages) as long as other precautions against clickjacking are taken. """ response.headers['X-Frame-Options'] = 'deny' return response def entity_statements(entity: dict, property_id: str) -> List[dict]: if entity.get('type') == 'mediainfo': # optional due to T272804 statements = entity['statements'] else: statements = entity['claims'] return statements.setdefault(property_id, []) def increment_rank(rank: str) -> str: return { 'deprecated': 'normal', 'normal': 'preferred', 'preferred': 'preferred', }[rank] def build_entity(entity_id: str, property_id: str, statements: List[dict]) -> dict: return { 'id': entity_id, 'claims': { # yes, 'claims' even for MediaInfo entities property_id: statements, }, } def save_entity_and_redirect(entity_data: dict, summary: str, base_revision_id: int, session: mwapi.Session) -> werkzeug.Response: token = session.get(action='query', meta='tokens', type='csrf')['query']['tokens']['csrftoken'] api_response = session.post(action='wbeditentity', id=entity_data['id'], data=json.dumps(entity_data), summary=summary, baserevid=base_revision_id, token=token) revision_id = api_response['entity']['lastrevid'] return flask.redirect(f'{session.host}/w/index.php' f'?diff={revision_id}&oldid={base_revision_id}')