Page MenuHomePhabricator
Paste P10957

Database drfit comparer
ActivePublic

Authored by Ladsgroup on Apr 10 2020, 11:25 PM.
Tags
None
Referenced Files
F31749937: raw.txt
Apr 10 2020, 11:25 PM
Subscribers
None
import sys
import re
import time
import subprocess
import json
import base64
from collections import defaultdict
import random
import requests
# Something like this: python3 new_db_checker.py core "sql {wiki} -- " s8 --important-only -v -prod
def debug(*args):
if '-v' in sys.argv or '--verbose' in sys.argv:
print(*args)
gerrit_url = 'https://gerrit.wikimedia.org/g/'
type_to_path_mapping = {
'core': 'mediawiki/core/+/master/maintenance/tables.sql',
'wikibase-repo': 'mediawiki/extensions/Wikibase/+/master/repo/sql/Wikibase.sql',
'wikibase-client': 'mediawiki/extensions/Wikibase/+/master/client/sql/entity_usage.sql'
}
by_db_drifts = {}
by_drift_type_drifts = defaultdict(dict)
def get_a_wiki_from_shard(shard):
debug('Getting a wiki from shard:', shard)
url = gerrit_url + 'operations/mediawiki-config/+/master/dblists/{shard}.dblist?format=TEXT'.format(shard=shard)
dbs = base64.b64decode(requests.get(url).text).decode('utf-8').split('\n')
random.shuffle(dbs)
for line in dbs:
if not line or line.startswith('#'):
continue
debug('Got this wiki:', line.strip())
return line.strip()
def add_to_drifts(shard, db, table, second, drift_type):
drift = ' '.join([table, second, drift_type])
if shard not in by_db_drifts:
by_db_drifts[shard] = defaultdict(list)
by_db_drifts[shard][db].append(drift)
if shard not in by_drift_type_drifts[drift]:
by_drift_type_drifts[drift][shard] = []
by_drift_type_drifts[drift][shard].append(db)
with open('by_db_drifts.json', 'w') as f:
f.write(json.dumps(by_db_drifts, indent=4, sort_keys=True))
with open('by_drift_type_drifts.json', 'w') as f:
f.write(json.dumps(by_drift_type_drifts, indent=4, sort_keys=True))
def get_sql_from_gerrit(type_):
url = gerrit_url + '{0}?format=TEXT'.format(type_to_path_mapping[type_])
return base64.b64decode(requests.get(url).text).decode('utf-8')
def get_shard_mapping():
shard_mapping = {}
db_eqiad_data = requests.get(
'https://noc.wikimedia.org/dbconfig/eqiad.json').json()
for shard in db_eqiad_data['sectionLoads']:
cases = []
if shard == 'DEFAULT':
name = 's3'
else:
name = shard
for type_ in db_eqiad_data['sectionLoads'][shard]:
cases += list(type_.keys())
shard_mapping[name] = cases
return shard_mapping
# https://stackoverflow.com/questions/5903720/recursive-diff-of-two-python-dictionaries-keys-and-values
def dd(d1, d2, ctx=""):
for k in d1:
if k not in d2:
print(k + " removed from " + ctx)
for k in d2:
if k not in d1:
print(k + " added in " + ctx)
continue
if d2[k] != d1[k]:
if type(d2[k]) not in (dict, list):
print(k + " changed in " + ctx + " to " + str(d2[k]))
else:
if type(d1[k]) != type(d2[k]):
print(k + " changed in " + ctx + " to " + str(d2[k]))
continue
else:
if type(d2[k]) == dict:
dd(d1[k], d2[k], k + ' in ' + ctx)
continue
return
def parse_sql(sql):
result = {}
sql = sql.replace('IF NOT EXISTS ', '')
for table_chunk in sql.split('CREATE TABLE '):
table_chunk = table_chunk.lower()
table_chunk = re.sub(r'/\*.+?\*/', '', table_chunk)
table_chunk = re.sub(r'\n\s*\-\-.*', '', table_chunk)
table_chunk = re.sub(r'\n\s*\n', '\n', table_chunk)
table_name = table_chunk.split('(')[0].strip()
if not table_name or '\n' in table_name:
continue
if '(' not in table_chunk:
continue
indexes = {}
for res in re.findall(r'create( +unique|) +index +(\S+?) +on +%s +\((.+?)\)\;' % table_name, table_chunk):
indexes[res[1]] = {'unique': bool(res[0]), 'columns': res[2]}
table_structure = re.split(
r'create( +unique|) +index', '('.join(table_chunk.split('(')[1:]))[0]
table_structure_real = {}
pk = None
for line in table_structure.split('\n'):
line = line.strip()
if not line or line.endswith(';'):
continue
# Why strip(',') doesn't work?
if line.endswith(','):
line = line[:-1]
if line.startswith('primary key'):
pk = line.split('(')[1].split(')')[0].replace(' ', '')
continue
line = re.sub(r' +', ' ', line).split('--')[0]
if line.split(' ')[1].startswith('enum'):
real_type = ' '.join(line.split(')')[0].split(' ')[1:]) + ')'
real_type = real_type.replace('"', '\'').replace(' ', '')
else:
real_type = line.split(' ')[1]
if ' unsigned ' in line:
line = line.replace(' unsigned ', ' ')
real_type += ' unsigned'
table_structure_real[line.split(' ')[0]] = {
'type': real_type, 'config': ' '.join(line.split(' ')[2:])}
result[table_name] = {
'structure': table_structure_real, 'indexes': indexes}
return result
def compare_table_with_prod(shard, host, table_name, expected_table_structure, sql_command):
port = None
if host != 'localhost':
if re.search(r' \-\-(?: |$)', sql_command):
sql_command = re.split(r' \-\-(?: |$)', sql_command)[
0] + ' --host ' + host + ' -- ' + re.split(r' \-\-(?: |$)', sql_command)[1]
if ':' in host:
port = host.split(':')[1]
host = host.split(':')[0]
host += '.eqiad.wmnet'
debug('Checking table ', table_name)
if port:
sql_command += ' -P ' + port
debug('Running: ', sql_command + ' -h %s -e "DESC %s;"' % (host, table_name))
res = subprocess.run(sql_command + ' -h %s -e "DESC %s;"' % (host, table_name),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if res.stderr and res.stderr.decode('utf-8'):
return {}
res = res.stdout.decode('utf-8')
fields_in_prod = []
return_result = {'fields': {}}
for line in res.split('\n'):
if line.startswith('ERROR'):
return return_result
if not line or line.startswith('Field'):
continue
field_structure = line.lower().split('\t')
fields_in_prod.append(field_structure[0])
name = field_structure[0]
if name not in expected_table_structure['structure']:
add_to_drifts(shard, host, table_name, name, 'field-mismatch-prod-extra')
continue
return_result['fields'][field_structure[0]] = field_structure[1]
if '--important-only' in sys.argv:
continue
expected_type = expected_table_structure['structure'][name]['type'].replace(
'varchar', 'varbinary').replace('integer', 'int')
if expected_type != field_structure[1].replace('varchar', 'varbinary'):
actual_size = None
if '(' in field_structure[1]:
actual_size = field_structure[1].split('(')[1].split(')')[0]
expected_size = None
if '(' in expected_type:
expected_size = expected_type.split('(')[1].split(')')[0]
if actual_size and expected_size and actual_size != expected_size:
add_to_drifts(shard, host, table_name, name, 'field-size-mismatch',
expected_size + ' ' + actual_size)
if (field_structure[1] + expected_type).count(' unsigned') == 1:
add_to_drifts(shard, host, table_name, name, 'field-unsigned-mismatch',
field_structure[1] + ' ' + expected_type)
actual_type = field_structure[1].split('(')[0].split(' ')[0]
expected_type = expected_type.split('(')[0].split(' ')[0]
if actual_type != expected_type:
add_to_drifts(shard, host, table_name, name, 'field-type-mismatch',
expected_type + ' ' + actual_type)
expected_config = expected_table_structure['structure'][name]['config']
if (field_structure[2] == 'no' and 'not null' not in expected_config) or (field_structure[2] == 'yes' and 'not null' in expected_config):
add_to_drifts(shard, host, table_name, name, 'field-null-mismatch')
# if len(field_structure[4]) < 4:
# default = ''
# else:
# default = field_structure[4]
# if default == 'null' and field_structure[2] == 'no':
# continue
#print(default, expected_config)
# if (default and 'default ' + default not in expected_config) or (not default and 'default ' in expected_config):
# print(host, table_name, name, 'field-default-mismatch')
# print(expected_config)
for field in expected_table_structure['structure']:
if field not in fields_in_prod:
add_to_drifts(shard, host, table_name, field, 'field-mismatch-codebase-extra')
res = subprocess.run(sql_command + ' -h %s -e "SHOW INDEX FROM %s;"' % (host, table_name),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
if res.stderr and res.stderr.decode('utf-8'):
return return_result
res = res.stdout.decode('utf-8')
return_result['indexes'] = {}
indexes = {}
for line in res.split('\n'):
if line.startswith('ERROR'):
return return_result
if not line or line.startswith('Table'):
continue
index_structure = line.lower().split('\t')
if index_structure[2] not in indexes:
indexes[index_structure[2]] = {
'unique': index_structure[1] == '0', 'columns': [index_structure[4]]}
else:
indexes[index_structure[2]]['columns'].append(index_structure[4])
return_result['indexes'] = indexes
expected_indexes = expected_table_structure['indexes']
for index in indexes:
# clean up primaries later
if index == 'primary':
continue
if index not in expected_indexes:
if index == 'tmp1':
print('wtf')
add_to_drifts(shard, host, table_name, index, 'index-mismatch-prod-extra')
continue
if indexes[index]['unique'] != expected_indexes[index]['unique']:
add_to_drifts(shard, host, table_name, index, 'index-uniqueness-mismatch')
expected_columns = expected_indexes[index]['columns'].replace(' ', '')
expected_columns = re.sub(r'\(.+?\)', '', expected_columns)
if ','.join(indexes[index]['columns']) != expected_columns:
add_to_drifts(shard, host, table_name, index, 'index-columns-mismatch')
for index in expected_indexes:
if index not in indexes:
add_to_drifts(shard, host, table_name, index, 'index-mismatch-code-extra')
return return_result
def handle_shard(shard, sql_data, hosts):
final_result = {}
sql_command = sys.argv[2]
if shard != None:
sql_command = sql_command.format(wiki=get_a_wiki_from_shard(shard))
debug('Sql command for this shard', sql_command)
for host in hosts:
final_result[host] = {}
for table in sql_data:
print(host, table)
final_result[host][table] = compare_table_with_prod(
shard, host, table, sql_data[table], sql_command)
time.sleep(1)
for host in hosts:
for table in final_result[hosts[0]]:
if final_result[host][table] != final_result[hosts[0]][table]:
dd(final_result[host][table],
final_result[hosts[0]][table], table + ':' + host)
def main():
shard_mapping = get_shard_mapping()
if sys.argv[1].lower() in type_to_path_mapping:
sql = get_sql_from_gerrit(sys.argv[1].lower())
else:
with open(sys.argv[1], 'r') as f:
sql = f.read()
sql_data = parse_sql(sql)
if '-prod' in sys.argv:
if sys.argv[3] == 'all':
for shard in shard_mapping:
handle_shard(shard, sql_data, shard_mapping[shard])
else:
hosts = shard_mapping[sys.argv[3]]
handle_shard(sys.argv[3], sql_data, hosts)
else:
handle_shard(None, sql_data, ['localhost'])
main()