diff --git a/templates/redundant.html b/templates/redundant.html
index 9ee633b..b3bbddd 100644
--- a/templates/redundant.html
+++ b/templates/redundant.html
@@ -1,17 +1,33 @@
{% extends "base.html" %}
{% block content %}
-{%- if redundant %}
+
Redundant bans
+{%- if bans %}
Channel | Ban | Covered by |
-{% for chan, ban, covered in redundant -%}
+{% for chan, ban, covered in bans -%}
{{ chan }} |
{{ ban }} |
{{ covered }} |
{% endfor -%}
{% else -%}
No redundant bans
{% endif %}
+Redundant quiets
+{%- if quiets %}
+
+ Channel | Quiet | Covered by |
+{% for chan, quiet, covered in quiets -%}
+
+ {{ chan }} |
+ {{ quiet }} |
+ {{ covered }} |
+
+{% endfor -%}
+
+{% else -%}
+No redundant bans
+{% endif %}
{% endblock %}
diff --git a/web.py b/web.py
index 91a17af..9498b73 100755
--- a/web.py
+++ b/web.py
@@ -1,564 +1,567 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, render_template, session, redirect, request, url_for, request
from collections import defaultdict
from requests_oauthlib import OAuth1
import time, re, oursql, os, requests, jwt, thread
def _force_https(app):
def wrapper(environ, start_response):
environ['wsgi.url_scheme'] = 'https'
return app(environ, start_response)
return wrapper
app = Flask(__name__)
app.wsgi_app = _force_https(app.wsgi_app)
app.debug = True
app.config['PREFERRED_URL_SCHEME'] = 'https'
_dbconn = [0, None]
chanLoaded = 0
sep = re.compile(r'^;|(?<=[^\\]);')
if 'userCheck' not in globals():
userCheck = {}
threadQueue = []
lock = thread.allocate_lock()
@app.route('/login', methods=['GET', 'POST'])
def login():
error = None
with open('web.log', 'a') as f:
f.write('login: %s; %r\n' % (request.method, session.items()))
# make login
if request.method == 'POST':
session.permanent = request.form.get('permanent') == 'yes'
returnto = request.form.get('returnto', '').lstrip('/')
uri = url_for('index') + returnto
if not 'user' in session:
with open('login_tokens') as f:
for token, user, t in [line.split('\t') for line in f.read().split('\n') if line]:
if token == session.get('token'):
if not user:
error = 'Login failed'
break
session['user'] = user
del session['token']
return redirect(uri)
elif not error:
return redirect(uri)
# show login command
returnto = request.args.get('returnto')
try:
with open('login_tokens') as f:
tokens = [i for i in [line.split('\t') for line in f.read().split('\n') if line] if int(i[2]) > time.time()]
except IOError:
tokens = []
token = os.urandom(5).encode('hex')
session['token'] = token
tokens.append([token, '', '%d' % (time.time() + 60)])
with open('login_tokens', 'w') as f:
f.write('\n'.join('\t'.join(i) for i in tokens))
return render_template('login.html', token=token, error=error, returnto=returnto)
@app.route('/logout')
def logout():
if 'user' in session:
del session['user']
return render_template('logout.html')
@app.route('/')
def index():
if not 'user' in session:
return redirect_login()
with open('.online') as f:
online = f.read().split('\n')
if not online[0].isdigit() or int(online[0]) + 700 < time.time():
online = False
r = query("""SELECT *
FROM actions
WHERE ac_timestamp > DATE_SUB(NOW(), INTERVAL 1 DAY) AND ac_action IN ('ban', 'kick', 'Killed', 'K-Lined', 'revove')
AND ac_channel NOT LIKE '#wikimedia-opbot%'
ORDER BY ac_id DESC
LIMIT 20""")[::-1]
graph = query("SELECT st_hour, SUM(st_msg) FROM stats WHERE st_hour > ? GROUP BY st_hour",
((int(time.time()) / 3600) - 168,))
if graph:
graph = '{%s}' % ','.join('%d:%d' % (h, n) for h, n in graph)
return render_template('mainpage.html', title='wmopbot Web Interface', actions=r, graph=graph, online=online and online[1:], **userparams())
@app.route('/channels')
def channels():
if not 'user' in session:
return redirect_login()
if chanLoaded + 300 < time.time():
loadChans()
return render_template('channels.html', title=u'Wikimedia IRC channels', chanTable=chanTable, **userparams())
@app.route('/actions')
def actions():
if not 'user' in session:
return redirect_login()
# actions: id, user(90), channel(50), action(10), target(90), args(255), timestamp
where, args, search, limit = [], [], [], 50
for column in ('user', 'action', 'channel', 'target', 'args', 'timestamp'):
value = request.args.get(column)
if not value:
continue
search.append('%s: %s' % (column, value))
if '*' in value or '?' in value:
value = value.replace('%', r'\%').replace('_', r'\_').replace('*', '%').replace('?', '_')
where.append('ac_%s LIKE ?' % column)
args.append(value)
else:
where.append('ac_%s = ?' % column)
args.append(value)
if 'limit' in request.args and request.args['limit'].isdigit():
limit = int(request.args['limit'])
if limit > 2000:
limit = 2000
r = query('SELECT * FROM actions%s ORDER BY ac_id DESC LIMIT %d' % (where and ' WHERE ' + ' and '.join(where) or '',
limit), tuple(args))[::-1]
return render_template('actions.html', title='Actions log', actions=r, search=search, **userparams())
@app.route('/lists')
def botlists():
if not 'user' in session:
return redirect_login()
r = query("SELECT ls_type, ls_target, ls_args FROM list WHERE ls_channel = 'global' and ls_type IN ('black', 'bot', 'exempt', 'track')")
lists = [(_type, target, {k: v.decode('utf-8', 'replace').replace('\;', ';') for k, v in (i.split(':', 1)
for i in sep.split(args))}) for _type, target, args in r]
lists = {_type: [(t.decode('utf-8', 'replace'), a.get('user'), a.get('time'))
for tp, t, a in lists if tp == _type] for _type in {a for a, b, c in lists}}
return render_template('lists.html', title='Lists', lists=lists, **userparams())
@app.route('/info')
@app.route('/info/')
def info(target=None):
if not 'user' in session:
return redirect_login()
if not target:
return render_template('info.html', title='Info', **userparams())
if target.startswith('#'):
return channel(target)
elif not any(x in target for x in '!@*?/.:'):
r = query('SELECT * FROM user WHERE LOWER(us_account) = ? LIMIT 100', (target.lower(),))
access = [(c, chanList[c]['access'][target]) for c in chanList if target in chanList[c]['access']]
accban = '$a:' + target
bans = [(chan, ban, chanList[chan]['ban'][ban]) for chan in chanList for ban in chanList[chan]['ban']
if ban == accban]
quiets = [(chan, quiet, chanList[chan]['quiet'][quiet]) for chan in chanList for quiet in chanList[chan]['quiet']
if quiet == accban]
if r or access or bans:
return render_template('user.html', title='Info about ' + target, users=r, access=access, bans=bans,
quiets=quiets, **userparams())
else:
return render_template('info.html', title='Info about ' + target, users=r,
warn=target + ' is an unknown account', **userparams())
elif '!' in target or '@' in target:
mask = target.replace('%', r'\%').replace('_', r'\_').replace('*', '%').replace('?', '_')
r = query('SELECT * FROM user WHERE us_user LIKE ? LIMIT 100', (mask,))
actions = query('SELECT * FROM actions WHERE ac_user LIKE ? OR ac_target LIKE ? ORDER BY ac_id DESC LIMIT 20',
(mask, mask))[::-1]
bans = [(chan, ban, chanList[chan]['ban'][ban]) for chan in chanList for ban in chanList[chan]['ban']
if maskre(ban, target)][:100]
quiets = [(chan, quiet, chanList[chan]['quiet'][quiet]) for chan in chanList for quiet in chanList[chan]['quiet']
if maskre(quiet, target)][:100]
return render_template('mask.html', title='Info about ' + target, users=r, bans=bans, quiets=quiets,
actions=actions, **userparams())
else:
return render_template('info.html', title='Info', **userparams())
def channel(chan):
if chan in chanList:
del chanList[chan]
for chan, _type, target, args in query('SELECT * FROM list WHERE ls_channel = ?', (chan,)):
chanList[chan].setdefault(_type, {})[target.decode('utf-8', 'replace')] = \
{k: v.decode('utf-8', 'replace').replace('\;', ';') for k, v in (i.split(':', 1)
for i in sep.split(args))}
if chan in chanList:
actions = query("SELECT * FROM actions WHERE ac_channel = ? AND ac_action IN ('ban', 'quiet', 'Killed', 'K-Lined', 'kick', 'remove') ORDER BY ac_id DESC LIMIT 10",
(chan,))[::-1]
graph = query("SELECT st_hour, st_msg FROM stats WHERE st_hour > ? AND st_channel = ?",
((int(time.time()) / 3600) - 168, chan))
if graph:
graph = '{%s}' % ','.join('%d:%d' % (h, n) for h, n in graph)
return render_template('channel.html', title='%s info' % chan, chan=chanList[chan], actions=actions,
channel=chan, graph=graph, **userparams())
else:
msg = chan + ' is an unknown channel, maybe the channels is not in meta:IRC/Channels.'
return render_template('info.html', title='Info', warn=msg, **userparams())
@app.route('/lang')
def languages():
if not 'user' in session:
return redirect_login()
messages = [key.split('.', 1) + [msg, user, dt.strftime('%Y-%m-%d %H:%M:%S')] for key, msg, user, dt in
query('SELECT * FROM lang')]
return render_template('lang.html', title='Languages messages', messages=messages, **userparams())
@app.route('/help')
def help():
if not 'user' in session:
return redirect_login()
return render_template('help.html', title='wmopbot help', **userparams())
@app.route('/about')
def about():
return render_template('about.html', title='What is wmopbot')
@app.route('/cloak', methods=['GET', 'POST'])
def cloakrequest():
if request.method == 'POST':
if not 'cloak' in request.form:
return render_template('cloak.html', message='To request a cloak, use the follow command in IRC: /msg wmopbot cloak')
token = request.form['token']
with open('cloak.ctrl') as f:
ctrl = f.readlines()
for i, line in enumerate(ctrl):
line = line.rstrip('\n').split('\t')
if line[0] == token:
if line[-1].isdigit():
return render_template('cloak.html', message='Error: This request had been already sent.')
ircuser, ircacc, wikiacc, botacc = line[3], line[4], line[5], len(line) > 6 and line[6] or None
ctrl[i] = '\t'.join(line + ['%d' % time.time()]) + '\n'
break
else:
return render_template('cloak.html', message='Internal Error') # token not in cloak.ctrl
data = '%s: request %s\n' % (ircacc, time.strftime('%Y-%m-%d %H:%M:%S'))
if wikiacc in userCheck and userCheck[wikiacc][1]:
data += userCheck[wikiacc][1]
# cloak: cl_id (key), cl_user (90), cl_ircacc (16), cl_wikiacc (tinytext), cl_ircbot (16), cl_type(enum),
# cl_cloak (50), cl_status (10), cl_timestamp (datetime), cl_data (text)
sql = 'INSERT INTO cloak (cl_user, cl_ircacc, cl_wikiacc, cl_ircbot, cl_type, cl_cloak, cl_status, cl_data) VALUES (?, ?, ?, ?, ?, ?, "new", ?)'
execute(sql, (ircuser, ircacc, wikiacc, botacc, request.form['type'], request.form['cloak'], data))
return render_template('cloak.html', message='The request was sent. The cloaks are processed in batches, so it may take some days to your request be processed.')
# method == GET
if 'ircacc' in request.args and 'wikiacc' in request.args: # for tests
ircuser, ircacc, wikiacc = request.args.get('ircuser', '?!?@?'), request.args['ircacc'], request.args['wikiacc']
token, cloak = os.urandom(5).encode('hex'), None
with open('cloak.ctrl', 'a') as f:
f.write('%s\t?\t%d\t%s\t%s\t%s\n' % (token, time.time(), ircuser, ircacc, wikiacc))
elif not 'oauth_token' in request.args or not 'oauth_verifier' in request.args:
return render_template('cloak.html', message='To request a cloak, use the follow command in IRC: /msg wmopbot cloak')
else:
token = request.args['oauth_token']
with open('cloak.ctrl') as f:
ctrl = [l for l in f.readlines() if l and int(l.split()[2]) + 25920000 > time.time()]
for line in ctrl:
line = line.rstrip('\n').split('\t')
if line[0] == token:
if len(line) > 5:
return render_template('cloak.html', message='Error: Request already initialized.')
if int(line[2]) + 60 < time.time():
return render_template('cloak.html', message='Error: Request expired.')
token_secret, ircuser, ircacc = line[1], line[3], line[4]
break
else:
return render_template('cloak.html', message='Error: Not a valid token')
with open('.oauth.key') as f:
consumer_key, consumer_secret = f.read().rstrip('\n').split('\t')
oauth = OAuth1(consumer_key, consumer_secret, token, token_secret, verifier=request.args['oauth_verifier'])
url = 'https://meta.wikimedia.org/w/index.php'
r = requests.post(url=url, params={'title': 'Special:OAuth/token'}, auth=oauth)
t = r.content.startswith('oauth_token') and dict(i.split('=', 1) for i in r.content.split('&'))
oauth = OAuth1(consumer_key, consumer_secret, t['oauth_token'], t['oauth_token_secret'])
r = requests.post(url=url, params={'title': 'Special:OAuth/identify'}, auth=oauth)
data = jwt.decode(r.content, consumer_secret, audience=consumer_key)
wikiacc = data['username']
for i, line in enumerate(ctrl):
line = line.rstrip('\n').split('\t')
if line[0] == token:
ctrl[i] = '\t'.join(line + [wikiacc]) + '\n'
with open('cloak.ctrl', 'w') as f:
f.writelines(ctrl)
wikimediaCloaks = ('wikipedia/', 'wikimedia/', 'wikibooks/', 'wikinews/', 'wikiquote/', 'wiktionary/',
'wikisource/', 'wikivoyage/', 'wikiversity/', 'wikidata/', 'mediawiki/')
host = ircuser.split('@')[-1]
cloak = host.startswith(wikimediaCloaks) and host or ''
checkuser(wikiacc)
return render_template('cloak.html', ircuser=ircuser, ircacc=ircacc, wikiacc=wikiacc, token=token, usercloak=cloak)
@app.route('/botaccount', methods=['POST'])
def botaccount():
token = request.form['token']
with open('cloak.ctrl') as f:
ctrl = [l for l in f.readlines() if l and int(l.split()[2]) + 25920000 > time.time()]
for line in ctrl:
line = line.rstrip('\n').split('\t')
if line[0] == token:
if len(line) < 7:
return 'Error: bot identification not received'
else:
return line[6]
return 'Error: token not found'
@app.route('/cloakqueue')
def cloakqueue():
if not 'user' in session:
return redirect_login()
limit = 'limit' in request.args and request.args['limit'].isdigit() and int(request.args['limit']) or 20
args = [limit > 200 and 200 or limit]
status = request.args.get('status', '')
if status:
args[0:0] = [status]
queue = query('SELECT * FROM cloak WHERE cl_status = ? ORDER BY cl_id DESC LIMIT ?', tuple(args))
else:
queue = query('SELECT * FROM cloak ORDER BY cl_id DESC LIMIT ?', tuple(args))
usergroup = 'op'
if 'test' in request.args:
usergroup = request.args['test']
elif chanList['#wikimedia-ops']['access'].get(session['user'], {}).get('template', '') == 'Contact':
usergorup = 'gc'
elif query("SELECT 1 FROM user WHERE us_account = ? and us_user LIKE '%@freenode/staff/%'", (session['user'],)):
usergroup = 'staff'
for req in queue:
data = req[-1].split('\n')
if len(data) < 2 or not data[1]:
checkuser(req[3])
return render_template('cloakqueue.html', title='Cloaks Queue', limit=limit, queuestatus=status,
requests=queue[::-1], usergroup=usergroup, **userparams())
@app.route('/setcloak', methods=['POST'])
def setcloak():
if not 'user' in session:
return 'Error: not logged'
with open('web.log', 'a') as f:
f.write('setcloak: %r\n' % request.form)
_id = request.form['id']
status, cloak = request.form.get('status'), request.form.get('cloak')
r = query('SELECT cl_data FROM cloak WHERE cl_id = ?', (_id,))
if not r:
return 'Error: id not found'
data = r[0][0].split('\n', 1)
history = data[0].split(',')
new = ''
if chanList['#wikimedia-ops']['access'].get(session['user'], {}).get('template', '') == 'Contact':
if cloak:
new = '%s: cloak edit %s' % (session['user'], time.strftime('%Y-%m-%d %H:%M:%S'))
data = ','.join(history + [new]) + '\n' + data[1]
execute('UPDATE cloak SET cl_cloak_edit = ?, cl_data = ? WHERE cl_id = ?', (cloak, data, _id))
return '(ok)%s\n%s' % (cloak, new)
elif not status in ('+1', '-1', 'approved', 'rejected', 'cloaked'):
return 'Error: You can not set that status'
elif not status:
return 'Error: No status to set'
elif query("SELECT 1 FROM user WHERE us_account = ? and us_user LIKE '%@freenode/staff/%'", (session['user'],)):
if not status in ('+1', '-1', 'cloaked'):
return 'Error: can not set that status'
elif history[-1].split()[1] in ('approved', 'rejected', 'cloaked'):
return 'Error: You can not review a request with that status'
elif session['user'] + ': ' in data[0]:
return 'Error: You may only review a request once'
elif not status in ('+1', '-1'):
return 'Error: You can not set that status'
new = '%s: %s %s' % (session['user'], status, time.strftime('%Y-%m-%d %H:%M:%S'))
data = ','.join(history + [new]) + '\n' + data[1]
status = status in ('+1', '-1') and 'reviewed' or status
execute("UPDATE cloak SET cl_status = ?, cl_data = ? WHERE cl_id = ?", (status, data, _id))
return '(ok)' + new
@app.route('/redundant')
def redundantBans():
"Generate a list of redundant bans for all channels"
if not 'user' in session:
return redirect_login()
- redundant = []
+ bans, quiets = [], []
for channel in sorted(c for c in chanList if c[0] == '#'):
check = [channel] + [ban[3:] for ban in chanList[channel]['ban'] if ban.startswith('$j:#')]
- redundant.extend([(channel, ban2check, ban + (checkChan != channel and ' in ' + checkChan or ''))
+ bans.extend([(channel, ban2check, ban + (checkChan != channel and ' in ' + checkChan or ''))
for checkChan in check for ban in chanList[checkChan]['ban'] for ban2check in chanList[channel]['ban']
if (checkChan != channel and not ban.startswith('$j') if ban == ban2check else maskre(ban, ban2check))])
- return render_template('redundant.html', title='Redundant bans', redundant=redundant, **userparams())
+ quiets.extend([(channel, quiet2check, quiet)
+ for quiet in chanList[channel]['quiet'] for quiet2check in chanList[channel]['quiet']
+ if quiet != quiet2check and maskre(quiet, quiet2check)])
+ return render_template('redundant.html', title='Redundant bans and quiets', bans=bans, quiets=quiets, **userparams())
@app.route('/reload')
def reloadchans():
if not 'user' in session:
return redirect_login()
loadChans()
return render_template('chansreloaded.html', title='Chans reloaded', **userparams())
def notAuth():
return render_template('notauth.html', title=u'Not authenticated')
@app.errorhandler(404)
def page_not_found(error):
return render_template('page_not_found.html', title=u'Page not found'), 404
def redirect_login():
returnto = request.full_path
url = returnto and returnto != '/' and url_for('login', returnto=returnto) or url_for('login')
r = redirect(url)
return r
def dbconn():
global _dbconn
if _dbconn[0] + 10 > time.time():
return _dbconn[1]
connection = oursql.connect(db='s53213__wmopbot', host='tools.labsdb',
read_default_file=os.path.expanduser('~/replica.my.cnf'), read_timeout=10, use_unicode=False)
c = connection.cursor()
_dbconn = [time.time(), c]
return c
def query(sql, args=()):
c = dbconn()
c.execute(sql, args)
return c.fetchall()
def execute(sql, args=()):
c = dbconn()
c.execute(sql, args)
def loadChans():
global chanList, chanTable, chanLoaded
chanList = defaultdict(lambda: {'access': {}, 'ban': {}, 'quiet': {}, 'mode': {}, 'exempt': {}, 'config': {}})
for chan, _type, target, args in query('SELECT * FROM list'):
chanList[chan].setdefault(_type, {})[target.decode('utf-8', 'replace')] = \
{k: v.decode('utf-8', 'replace').replace('\;', ';') for k, v in (i.split(':', 1)
for i in sep.split(args))}
stats = query('SELECT st_channel, SUM(st_msg), (SUM(st_users) / COUNT(*)) FROM stats WHERE st_hour > (UNIX_TIMESTAMP() / 3600 - 168) GROUP BY st_channel')
stats = {chan: (int(msg), int(usr)) for chan, msg, usr in stats}
with open('.online') as f:
online = f.read().split('\n')[1:]
chanTable = []
for chan in chanList:
if chan[0] != '#':
continue
ops = len([1 for args in chanList[chan]['access'].values() if 'o' in args['flags']])
wmfgc = 'wmfgc' in chanList[chan]['access'] and chanList[chan]['access']['wmfgc'].get('flags', '') or \
chanList[chan]['config'].get('update', {}).get('cserror') or ''
bans = len([1 for b in chanList[chan]['ban']])
jbans = ', '.join([b for b in chanList[chan]['ban'] if b.startswith('$j:')])
modes = chanList[chan]['config'].get('cs/irc', {}).get('mode', '?')
csflags = chanList[chan]['config'].get('cs/irc', {}).get('csflags', '')
msg = chan in stats and stats[chan][0] or chan in online and '0' or ''
users = chan in stats and stats[chan][1] or ''
chanTable.append((chan, ops, wmfgc, bans, jbans, modes, csflags, msg, users))
chanTable.sort()
chanLoaded = int(time.time())
loadChans()
def userparams():
user = session['user']
chans = query(r"SELECT ls_channel FROM list WHERE ls_type = 'access' AND ls_target = ? AND ls_args RLIKE 'flags:\\w*[Oo]\\w*;'", (user,))
return {'user': user, 'userChans': [i[0] for i in chans]}
app.add_template_filter(lambda t: t and t.isdigit() and time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(t))) or t,
name='ftime')
app.add_template_filter(lambda t: t.decode('utf-8', 'replace'), name='utf8')
def checkuser(wikiacc):
with open('web.log', 'a') as f:
f.write('checkuser: %r\n' % userCheck)
if not wikiacc in userCheck:
userCheck[wikiacc] = [time.time(), None]
if not userCheck[wikiacc][1] and not wikiacc in threadQueue:
threadQueue.append(wikiacc)
if threadQueue and not lock.locked():
thread.start_new_thread(checkuser_thread, ())
def checkuser_thread():
lock.acquire()
try:
while threadQueue:
checkuser_thread2(threadQueue.pop(0))
for u, v in userCheck.items():
if v[0] + 86400 < time.time():
del userCheck[u]
finally:
lock.release()
def checkuser_thread2(user):
dbs = None
wikis = {}
dbtime = []
for s in ('s1', 's2', 's3', 's4', 's5', 's6', 's7'):
connection = oursql.connect(host=s + '.labsdb', read_default_file=os.path.expanduser('~/replica.my.cnf'),
read_timeout=10, use_unicode=True, autoreconnect=True, autoping=True)
c = connection.cursor()
if not dbs:
c.execute('SELECT dbname, url FROM meta_p.wiki')
dbs = {db[0].encode('utf-8') + '_p': db[1] for db in c.fetchall() if db[1]}
c.execute('SHOW DATABASES')
sdbs = [db[0] for db in c.fetchall() if db[0] in dbs]
if not 'global' in wikis and 'centralauth_p' in sdbs:
c.execute("SELECT gb_expiry FROM centralauth_p.globalblocks WHERE gb_address = ?", (user,))
block = c.fetchall()
wikis['global'] = block and 'Global blocked (expiry: %s) ' % expirytime(block[0]) or None
for db in sdbs:
wikis[dbs[db]] = checkwiki(c, db, user)
del dbs[db]
c.connection.close()
if not dbs:
break
connection.close()
data = '%s: %s' % (user, wikis.pop('global', None) or '')
wikis = sorted(((w, d) for w, d in wikis.iteritems() if d), key=lambda i:i[1][0], reverse=True)
data += ', '.join('%s (%s)' % (w.split('//')[-1], d[1]) for w, d in wikis[:(len(wikis) > 8 and 6 or 8)])
if len(wikis) > 8:
data += ', and more %d wikis with the average of %.1f edits per wiki' % (len(wikis) - 6,
float(sum(d[0] for w, d in wikis[6:])) / (len(wikis) - 6))
userCheck[user][1] = data
id_data = query('SELECT cl_id, cl_data FROM cloak WHERE cl_wikiacc = ?', (user,))
for cl_id, cl_data in id_data:
cl_data = cl_data.split('\n')
if len(cl_data) == 1:
cl_data.append('')
if not cl_data[1]:
cl_data[1] = data
execute('UPDATE cloak SET cl_data = ? WHERE cl_id = ?', ('\n'.join(cl_data), cl_id))
def checkwiki(c, db, user):
c.execute("SELECT user_editcount, user_id FROM %s.user WHERE user_name = ?" % db, (user,))
count_id = c.fetchall()
if not count_id or not count_id[0][0]:
return False
count, user_id = int(count_id[0][0]), int(count_id[0][1])
data = '%d edit%s' % (count, count != 1 and 's' or '')
c.execute("SELECT ipb_expiry FROM %s.ipblocks_ipindex WHERE ipb_address = ?" % db, (user,))
block = c.fetchall()
if block:
data += ', blocked (expiry: %s)' % block
c.execute("SELECT ug_group FROM %s.user_groups WHERE ug_user = ?" % db, (user_id,))
groups = [g[0] for g in c.fetchall()]
if groups:
data += ', ' + ', '.join(groups)
return count, data
def expirytime(e):
return e.isdigit and '%s-%s-%s %s:%s:%s' % (e[0:4], e[4:6], e[6:8], e[8:10], e[10:12], e[12:14]) or e
def irclower(s):
"IRC lower case: acording RFC 2812, the characters {}|^ are the respective lower case of []\~"
return s.lower().replace('{', '[').replace('}', ']').replace('|', '\\').replace('^', '~')
mask2re = {'\\': '[\\\\|]', '|': '[\\\\|]', '^': '[~^]', '~': '[~^]', '[': '[[{]', ']': '[]}]', '{': '[[{]',
'}': '[]}]', '*': '[^!@]*', '?': '[^!@]', '+': '\\+', '.': '\\.', '(': '\\(', ')': '\\)', '$': '\\$'}
def maskre(mask, match=None):
"Transforms an IRC mask into a regex pattern"
mask = irclower(mask)
regex = ''
for c in mask:
regex += mask2re.get(c, c)
try:
r = re.compile(regex + '$', re.I)
except:
r = None
if not match:
return r
return r.match(match)
if __name__ == '__main__':
with open('.secret') as f:
app.secret_key = f.read()
if os.uname()[1].startswith('tools-webgrid'):
app.config['APPLICATION_ROOT'] = '/wmopbot/'
from flup.server.fcgi_fork import WSGIServer
WSGIServer(app).run()
else:
# Run outside Labs for tests (no database)
app.run()
# print 'This tool does not work outside wmopbot project in Tool Labs'