diff --git a/app.py b/app.py index eff5d30..552d4d2 100644 --- a/app.py +++ b/app.py @@ -1,388 +1,388 @@ # -*- coding: utf-8 -*- import bs4 import decorator import flask import ipaddress import mwapi import mwoauth import os import random import requests import requests_oauthlib import string import toolforge import yaml import ids import scripts import unicodescripts app = flask.Flask(__name__) app.before_request(toolforge.redirect_to_https) toolforge.set_user_agent('speedpatrolling', email='mail@lucaswerkmeister.de') user_agent = requests.utils.default_user_agent() __dir__ = os.path.dirname(__file__) try: with open(os.path.join(__dir__, 'config.yaml')) as config_file: app.config.update(yaml.safe_load(config_file)) except FileNotFoundError: print('config.yaml file not found, assuming local development setup') app.secret_key = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(64)) if 'oauth' in app.config: consumer_token = mwoauth.ConsumerToken(app.config['oauth']['consumer_key'], app.config['oauth']['consumer_secret']) @decorator.decorator def memoize(func, *args, **kwargs): if args or kwargs: raise TypeError('only memoize functions with no arguments') key = '_memoize_' + func.__name__ if key not in flask.g: setattr(flask.g, key, func()) return getattr(flask.g, key) @app.template_global() def csrf_token(): if 'csrf_token' not in flask.session: flask.session['csrf_token'] = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(64)) return flask.session['csrf_token'] @app.template_global() def form_value(name): if 'repeat_form' in flask.g and name in flask.request.form: return (flask.Markup(r' value="') + flask.Markup.escape(flask.request.form[name]) + flask.Markup(r'" ')) else: return flask.Markup() @app.template_global() def form_attributes(name): return (flask.Markup(r' id="') + flask.Markup.escape(name) + flask.Markup(r'" name="') + flask.Markup.escape(name) + flask.Markup(r'" ') + form_value(name)) def is_ip_address(val): try: ipaddress.ip_address(val) return True except ValueError: return False @app.template_filter() def user_link(user_name): if is_ip_address(user_name): user_link_prefix = 'https://www.wikidata.org/wiki/Special:Contributions/' else: user_link_prefix = 'https://www.wikidata.org/wiki/User:' return (flask.Markup(r'') + flask.Markup(r'') + flask.Markup.escape(user_name) + flask.Markup(r'') + flask.Markup(r'')) @app.template_global() def user_logged_in(): return 'oauth_access_token' in flask.session @app.template_global() def authentication_area(): if 'oauth' not in app.config: return flask.Markup() if not user_logged_in(): return (flask.Markup(r'Log in')) identity = identify() return (flask.Markup(r'Logged in as ') + user_link(identity['username']) + flask.Markup(r'')) @memoize def authenticated_session(): if 'oauth_access_token' in flask.session: access_token = mwoauth.AccessToken(**flask.session['oauth_access_token']) auth = requests_oauthlib.OAuth1(client_key=consumer_token.key, client_secret=consumer_token.secret, resource_owner_key=access_token.key, resource_owner_secret=access_token.secret) return mwapi.Session(host='https://www.wikidata.org', auth=auth, user_agent=user_agent) else: return None @memoize def any_session(): return authenticated_session() or mwapi.Session(host='https://www.wikidata.org', user_agent=user_agent) @memoize def identify(): if 'oauth_access_token' in flask.session: access_token = mwoauth.AccessToken(**flask.session['oauth_access_token']) return mwoauth.identify('https://www.wikidata.org/w/index.php', consumer_token, access_token) else: return None @memoize def user_rights(): session = authenticated_session() if session is None: return [] return session.get(action='query', meta='userinfo', uiprop='rights')['query']['userinfo']['rights'] @app.template_global() def user_can_patrol(): return 'patrol' in user_rights() @app.template_global() def user_can_rollback(): return 'rollback' in user_rights() @app.route('/') def index(): return flask.render_template('index.html') @app.route('/settings/', methods=['GET', 'POST']) def settings(): scripts = dict.fromkeys(unicodescripts.all_scripts(), False) del scripts['Common'] del scripts['Inherited'] if flask.request.method == 'POST': if not submitted_request_valid(): return 'CSRF error', 400 flask.session['supported_scripts'] = [script for script in flask.request.form.getlist('script') if script in scripts] supported_scripts = flask.session.get('supported_scripts', None) if supported_scripts is None: supported_scripts = user_scripts_from_babel() if 'Latin' not in supported_scripts: # if they use this tool, they can read Latin, even if it’s not in their Babel supported_scripts.append('Latin') for script in supported_scripts: scripts[script] = True return flask.render_template('settings.html', scripts=scripts) @app.route('/diff/') def any_diff(): if not user_logged_in(): return flask.redirect(flask.url_for('login')) skipped_rev_ids = ids.get(flask.session, 'skipped_rev_ids') ignored_page_ids = ids.get(flask.session, 'ignored_page_ids') supported_scripts = flask.session.get('supported_scripts') for rev_id in ids.unpatrolled_changes(authenticated_session()): if rev_id in skipped_rev_ids: continue if ids.rev_id_to_page_id(rev_id, any_session()) in ignored_page_ids: continue if supported_scripts is not None: diff_body = any_session().get(action='compare', fromrev=rev_id, torelative='prev', prop=['diff'], formatversion=2)['compare']['body'] script = scripts.primary_script_of_diff(diff_body) if script is not None and script not in supported_scripts: continue return flask.redirect(flask.url_for('diff', rev_id=rev_id)) @app.route('/diff//') def diff(rev_id): session = any_session() results = session.get(action='compare', fromrev=rev_id, torelative='prev', prop=['title', 'user', 'parsedcomment', 'diff'], formatversion=2)['compare'] return flask.render_template('diff.html', rev_id=rev_id, title=results['totitle'], old_user=results['fromuser'], new_user=results['touser'], old_comment=fix_markup(results['fromparsedcomment']), new_comment=fix_markup(results['toparsedcomment']), body=fix_markup(results['body'])) @app.route('/diff//skip', methods=['POST']) def diff_skip(rev_id): if not submitted_request_valid(): return 'CSRF error', 400 ids.append(flask.session, 'skipped_rev_ids', rev_id) page_id = ids.rev_id_to_page_id(rev_id, any_session()) if page_id in ids.get(flask.session, 'skipped_page_ids'): if page_id not in ids.get(flask.session, 'acted_page_ids'): ids.append(flask.session, 'ignored_page_ids', page_id) else: ids.append(flask.session, 'skipped_page_ids', page_id) return flask.redirect(flask.url_for('any_diff')) @app.route('/diff//patrol', methods=['POST']) def diff_patrol(rev_id): if not submitted_request_valid(): return 'CSRF error', 400 session = authenticated_session() ids.append(flask.session, 'acted_page_ids', ids.rev_id_to_page_id(rev_id, session)) token = session.get(action='query', meta='tokens', type='patrol')['query']['tokens']['patroltoken'] session.post(action='patrol', revid=rev_id, token=token) return flask.redirect(flask.url_for('any_diff')) @app.route('/diff//rollback', methods=['POST']) def diff_rollback(rev_id): if not submitted_request_valid(): return 'CSRF error', 400 session = authenticated_session() ids.append(flask.session, 'acted_page_ids', ids.rev_id_to_page_id(rev_id, session)) results = session.get(action='query', meta='tokens', type='rollback', revids=[str(rev_id)], prop='revisions', rvprop='user', formatversion='2') token = results['query']['tokens']['rollbacktoken'] page = results['query']['pages'][0] pageid = page['pageid'] user = page['revisions'][0]['user'] try: session.post(action='rollback', pageid=pageid, user=user, token=token) except mwapi.errors.APIError as error: # TODO use errorformat='html' once mwapi supports it (mediawiki-utilities/python-mwapi#34) info_html = session.get(action='parse', text=error.info, prop=['text'], wrapoutputclass=None, disablelimitreport=True, contentmodel='wikitext', formatversion=2)['parse']['text'] return flask.render_template('rollback-error.html', rev_id=rev_id, user=user, info=flask.Markup(info_html)) else: return flask.redirect(flask.url_for('any_diff')) @app.route('/login') def login(): redirect, request_token = mwoauth.initiate('https://www.wikidata.org/w/index.php', consumer_token, user_agent=user_agent) flask.session['oauth_request_token'] = dict(zip(request_token._fields, request_token)) return flask.redirect(redirect) @app.route('/oauth/callback') def oauth_callback(): - request_token = mwoauth.RequestToken(**flask.session['oauth_request_token']) + request_token = mwoauth.RequestToken(**flask.session.pop('oauth_request_token')) access_token = mwoauth.complete('https://www.wikidata.org/w/index.php', consumer_token, request_token, flask.request.query_string, user_agent=user_agent) flask.session['oauth_access_token'] = dict(zip(access_token._fields, access_token)) return flask.redirect(flask.url_for('index')) @app.route('/logout') def logout(): flask.session.clear() return flask.redirect(flask.url_for('index')) def fix_markup(html): soup = bs4.BeautifulSoup(html, 'html.parser') for link in soup.select('a[href]'): href = link['href'] if href.startswith('/') and not href.startswith('//'): link['href'] = 'https://www.wikidata.org' + href return flask.Markup(str(soup)) def user_scripts_from_babel(): session = any_session() languages = session.get(action='query', meta='babel', babuser=identify()['username'])['query']['babel'].keys() autonyms = language_autonyms(languages) return scripts.scripts_of_text(char for autonym in autonyms.values() for char in autonym) def language_autonyms(language_codes): wikitext = '' for language_code in language_codes: wikitext += '
' + language_code + '
{{#language:' + language_code + '|' + language_code + '}}
' html = any_session().get(action='parse', text=wikitext, contentmodel='wikitext', prop=['text'], wrapoutputclass='', disablelimitreport=True, formatversion=2)['parse']['text'] soup = bs4.BeautifulSoup(html.strip(), 'html.parser') autonyms = {} for span in soup.contents: language_code = span.dt.string autonym = span.dd.string autonyms[language_code] = autonym return autonyms def full_url(endpoint, **kwargs): scheme=flask.request.headers.get('X-Forwarded-Proto', 'http') return flask.url_for(endpoint, _external=True, _scheme=scheme, **kwargs) def submitted_request_valid(): """Check whether a submitted POST request is valid. If this method returns False, the request might have been issued by an attacker as part of a Cross-Site Request Forgery attack; callers MUST NOT process the request in that case. """ real_token = flask.session.pop('csrf_token', None) submitted_token = flask.request.form.get('csrf_token', None) if not real_token: # we never expected a POST return False if not submitted_token: # token got lost or attacker did not supply it return False if submitted_token != real_token: # incorrect token (could be outdated or incorrectly forged) return False if not flask.request.referrer.startswith(full_url('index')): # correct token but not coming from the correct page; for # example, JS running on https://tools.wmflabs.org/tool-a is # allowed to access https://tools.wmflabs.org/tool-b and # extract CSRF tokens from it (since both of these pages are # hosted on the https://tools.wmflabs.org domain), so checking # the Referer header is our only protection against attackers # from other Toolforge tools return False return True @app.after_request def deny_frame(response): """Disallow embedding the tool’s pages in other websites. If other websites can embed this tool’s pages, e. g. in