Page MenuHomePhabricator
Paste P2798

Gather list exporter bot
ActivePublic

Authored by Tgr on Mar 21 2016, 11:10 PM.
Referenced Files
F3869273: Gather list exporter bot
Apr 13 2016, 2:35 PM
F3868874: Gather list exporter bot
Apr 13 2016, 12:12 PM
F3687092: Gather list exporter bot
Mar 21 2016, 11:10 PM
Subscribers
None
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""gather_list_requests.py: Pywikibot script to export gather lists.
Input is a file named gather_list_requests.tsv created from the
following SQL query run on the analytics DB:
SELECT
user_name,
gl_id,
gl_label,
gl_info,
gli_namespace,
gli_title
FROM
staging.tgr_gather_user_requests
JOIN enwiki.user USING (user_name)
JOIN enwiki.gather_list ON user_id = gl_user
JOIN enwiki.gather_list_item ON gl_id = gli_gl_id
WHERE
gl_perm = 1
AND gl_perm_override != 1
ORDER BY
user_name,
gl_id,
gli_order
;
"""
import sys
import csv
import json
import locale
import codecs
from cgi import escape
from itertools import imap, islice, ifilter, groupby, chain
from collections import defaultdict
import pywikibot
from pywikibot.data import api
# create a gallery or a plain list?
use_images = True
# Wrap sys.stdout into a StreamWriter to allow writing unicode.
sys.stdout = codecs.getwriter(locale.getpreferredencoding())(sys.stdout)
site = pywikibot.Site('en', 'wikipedia')
class MySQLTSVDialect(csv.Dialect):
delimiter = '\t'
escapechar = '\\'
doublequote = False
lineterminator = '\n'
quoting = csv.QUOTE_NONE
skipinitialspace = True
def get_data_from_row(row, header):
row = [unicode(field, 'utf-8') for field in row]
data = dict(zip(header, row))
if data.get('gl_info'):
try:
data['gl_info'] = defaultdict(lambda: None, json.loads(data['gl_info']))
except Exception, e:
raise ValueError('Invalid JSON for list %s: %s -- %s' % (data['gl_id'], data, e))
else:
data['gl_info'] = defaultdict(lambda: None)
return data
def get_pagename(ns, title):
ns = int(ns)
return str(site.namespaces[int(ns)]) + title if ns else title
def get_wikilink(title, text):
if not text or title == text:
return '[[%s]]' % (title,)
else:
return '[[%s|%s]]' % (title, text)
def index_page_generator():
item = yield
while True:
text = ''
image_text = ''
while item:
image = self_if_free_or_null('File:' + (item['gl_info']['image'] or ''))
title = u'%s \u2013 %s' % (item['gl_id'], item['gl_label']) if item.get('gl_label') else item['gl_id']
page = 'User:%s/Gather lists/%s' % (user, title)
link = get_wikilink(page, item.get('gl_label', item['gl_id']))
text += '%s|%s|link=%s\n' % (image or 'File:1x1.png', link, page)
if image:
image_text += ': [[:%s|]]\n' % (image,)
item = yield
text = '<gallery mode="packed">\n%s</gallery>\n' % (text,)
if image_text:
image_text = '\n<div style="font-size:75%%">\nImage information:\n%s</div>' % (image_text,)
item = yield 'User:%s/Gather lists' % (user,), text + image_text, 'Exporting Gather lists - see https://phabricator.wikimedia.org/T128056'
def list_page_generator():
item = yield
while True:
list_id = item['gl_id']
if item['gl_info']['description']:
caption = ' caption="%s"' % (escape(item['gl_info']['description']),)
else:
caption = ''
title = u'%s \u2013 %s' % (list_id, item['gl_label']) if item.get('gl_label') else list_id
text = ''
image_text = ''
while item:
page = get_pagename(item['gli_namespace'], item['gli_title'])
image = self_if_free_or_null(get_pageimage(page))
link = get_wikilink(page, item['gli_title'].replace('_', ' '))
text += '%s|%s|link=%s\n' % (image or 'File:1x1.png', link, page)
if image:
image_text += ': [[:%s|]]\n' % (image,)
item = yield
text = '<gallery mode="packed"%s>\n%s</gallery>\n' % (caption, text)
if image_text:
image_text = '\n<div style="font-size:75%%">\nImage information:\n%s</div>' % (image_text,)
item = yield 'User:%s/Gather lists/%s' % (user, title), text + image_text, 'Exporting Gather list %s - see https://phabricator.wikimedia.org/T128056' % (list_id, )
def plaintext_page_generator():
args = yield
while True:
text = ''
while args:
item, new_list = args
if new_list:
text += '== %s ==\n' % (item.get('gl_label', item['gl_id']),)
if item['gl_info']['description']:
text += ":''%s''\n" % (escape(item['gl_info']['description']),)
page = get_pagename(item['gli_namespace'], item['gli_title'])
link = get_wikilink(page, item['gli_title'].replace('_', ' '))
text += '* %s\n' % (link,)
args = yield
args = yield 'User:%s/Gather lists' % (user,), text, 'Exporting Gather lists - see https://phabricator.wikimedia.org/T131063'
def create_page(title, text, summary):
#print u'%s\n\n---------------\n\n%s\n\n---------------\n\n%s\n\n===============\n\n' % (title, text, summary)
#return # FIXME test
page = pywikibot.Page(site, title)
page.text = text
page.save(summary=summary)
def chunked_iterator(lst, fn, size):
return chain(*imap(fn, (lst[pos:pos+size] for pos in xrange(0, len(lst), size))))
page_data = None
def get_pageimage(page):
global page_data
if not page_data:
page_data = {}
pages = [get_pagename(row['gli_namespace'], row['gli_title']) for row in get_rows()]
query = chunked_iterator(pages, lambda pages: api.PropertyGenerator('pageimages', titles=pages, site=site), 50)
for data in query:
page_data[data['title'].replace('_', ' ')] = 'File:' + data['pageimage'].replace('_', ' ') if data.get('pageimage') else None
page = page.replace('_', ' ')
return page_data.get(page)
image_data = None
def self_if_free_or_null(image):
global image_data
get_pageimage('-') # make sure page_data is set up
if not image_data:
image_data = {}
list_images = (('File:' + row['gl_info']['image']) for row in get_rows() if row['gl_info']['image'])
item_images = (page_data[row] for row in page_data)
images = list(set(ifilter(None, chain(list_images, item_images))))
query = chunked_iterator(images, lambda images: api.PropertyGenerator('imageinfo', iiprop='extmetadata', iiextmetadatafilter='NonFree', titles=images, site=site), 50)
for data in query:
try:
is_free = data['imageinfo'][0]['extmetadata']['NonFree']['value'] != u'true'
except:
is_free = True
image_data[data['title'].replace('_', ' ')] = is_free
if not image:
return None
image = image.replace('_', ' ')
return image if image_data.get(image) else None
def get_rows():
with open('gather_list_requests.tsv', 'rb') as infile:
rows = csv.reader(infile, dialect=MySQLTSVDialect)
header = next(rows)
rows = imap(lambda row: get_data_from_row(row, header), rows)
for row in rows:
yield row
index_maker = index_page_generator()
list_maker = list_page_generator()
plaintext_maker = plaintext_page_generator()
next(index_maker)
next(list_maker)
next(plaintext_maker)
for user, all_lists_of_user in groupby(get_rows(), lambda l: l['user_name']):
if use_images:
for list_id, single_list in groupby(all_lists_of_user, lambda l: l['gl_id']):
for row in single_list:
list_maker.send(row)
create_page(*list_maker.send(None))
index_maker.send(row)
create_page(*index_maker.send(None))
else:
for list_id, single_list in groupby(all_lists_of_user, lambda l: l['gl_id']):
new_list = True
for row in single_list:
plaintext_maker.send((row, new_list))
new_list = False
create_page(*plaintext_maker.send(None))