Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F30117657
raw.txt
No One
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Authored By
thcipriani
Aug 26 2019, 5:21 PM
2019-08-26 17:21:36 (UTC+0)
Size
4 KB
Referenced Files
None
Subscribers
None
raw.txt
View Options
import json
import os
import re
import time
import sys
import requests
URIS = {}
def flatten_for_post(h, result=None, kk=None):
"""
Since phab expects x-url-encoded form post data (meaning each
individual list element is named). AND because, evidently, requests
can't do this for me, I found a solution via stackoverflow.
See also:
<https://secure.phabricator.com/T12447>
<https://stackoverflow.com/questions/26266664/requests-form-urlencoded-data/36411923>
"""
if result is None:
result = {}
if isinstance(h, str) or isinstance(h, bool) or isinstance(h, unicode):
result[kk] = h
elif isinstance(h, list) or isinstance(h, tuple):
for i, v1 in enumerate(h):
flatten_for_post(v1, result, '%s[%d]' % (kk, i))
elif isinstance(h, dict):
for (k, v) in h.items():
key = k if kk is None else "%s[%s]" % (kk, k)
if isinstance(v, dict):
for i, v1 in v.items():
flatten_for_post(v1, result, '%s[%s]' % (key, i))
else:
flatten_for_post(v, result, key)
return result
def replace_domain(uri):
return re.sub(
'^https://gerrit.wikimedia.org/r/?p?',
'https://gerrit-replica.wikimedia.org/r/',
uri
)
class Phab(object):
def __init__(self):
self.phab_url = 'https://phabricator.wikimedia.org/api/'
self.conduit_token = self._get_token()
def _get_token(self):
"""
Use the $CONDUIT_TOKEN envvar, fallback to whatever is in ~/.arcrc
"""
token = None
token_path = os.path.expanduser('~/.arcrc')
if os.path.exists(token_path):
with open(token_path) as f:
arcrc = json.load(f)
token = arcrc['hosts'][self.phab_url]['token']
return os.environ.get('CONDUIT_TOKEN', token)
def _query_phab(self, method, data):
"""
Helper method to query phab via requests and return json
"""
data = flatten_for_post(data)
data['api.token'] = self.conduit_token
r = requests.post(
os.path.join(self.phab_url, method),
data=data)
r.raise_for_status()
return r.json()
def find_uris(self, after=None):
"""
Get a set of authors from a list of commit sha1s
"""
data = {
# "queryKey": "all",
"queryKey": "active",
"attachments": {
'uris': '1',
},
}
if after:
data['after'] = after
repos = self._query_phab(
'diffusion.repository.search',
data
)
results = repos['result']
if not results:
return
for repo in results['data']:
for uri in repo['attachments']['uris']['uris']:
# Looking only for uris that are pointing to gerrit
# that are also of io type "observe"
phid = uri['id']
name = uri['fields']['uri']['raw']
io = uri['fields']['io']['raw']
if 'gerrit-slave' in name and io == 'observe':
URIS[phid] = name
after = results['cursor']['after']
if after:
self.find_uris(after)
else:
return
def update_uri(self, phid, uri):
method = 'diffusion.uri.edit'
data = {
'objectIdentifier': str(phid),
'transactions': [{
'type': 'uri',
'value': str(uri)
}]
}
print(flatten_for_post(data))
# return {'error_code': None}
return self._query_phab(method, data)
def main():
p = Phab()
phab_uri_path = 'phaburis.json'
if not os.path.exists(phab_uri_path):
p.find_uris()
with open(phab_uri_path, 'w') as f:
f.write(json.dumps(URIS))
with open(phab_uri_path) as f:
uris = json.loads(f.read())
count = 0
for phid, uri in uris.items():
result = p.update_uri(phid, replace_domain(uri))
if result['error_code']:
print('OOOOHHHH NOOOO')
print(result)
sys.exit(1)
count += 1
if count >= 50:
count = 0
time.sleep(5)
if __name__ == '__main__':
main()
File Metadata
Details
Attached
Mime Type
text/plain; charset=utf-8
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
7885306
Default Alt Text
raw.txt (4 KB)
Attached To
Mode
P8857 findphaburis.py
Attached
Detach File
Event Timeline
Log In to Comment