Page MenuHomePhabricator
Paste P8857

findphaburis.py
ActivePublic

Authored by thcipriani on Fri, Aug 2, 10:18 PM.
import json
import os
import requests
URIS = {}
def flatten_for_post(h, result=None, kk=None):
"""
Since phab expects x-url-encoded form post data (meaning each
individual list element is named). AND because, evidently, requests
can't do this for me, I found a solution via stackoverflow.
See also:
<https://secure.phabricator.com/T12447>
<https://stackoverflow.com/questions/26266664/requests-form-urlencoded-data/36411923>
"""
if result is None:
result = {}
if isinstance(h, str) or isinstance(h, bool) or isinstance(h, unicode):
result[kk] = h
elif isinstance(h, list) or isinstance(h, tuple):
for i, v1 in enumerate(h):
flatten_for_post(v1, result, '%s[%d]' % (kk, i))
elif isinstance(h, dict):
for (k, v) in h.items():
key = k if kk is None else "%s[%s]" % (kk, k)
if isinstance(v, dict):
for i, v1 in v.items():
flatten_for_post(v1, result, '%s[%s]' % (key, i))
else:
flatten_for_post(v, result, key)
return result
class Phab(object):
def __init__(self):
self.phab_url = 'https://phabricator.wikimedia.org/api/'
self.conduit_token = self._get_token()
def _get_token(self):
"""
Use the $CONDUIT_TOKEN envvar, fallback to whatever is in ~/.arcrc
"""
token = None
token_path = os.path.expanduser('~/.arcrc')
if os.path.exists(token_path):
with open(token_path) as f:
arcrc = json.load(f)
token = arcrc['hosts'][self.phab_url]['token']
return os.environ.get('CONDUIT_TOKEN', token)
def _query_phab(self, method, data):
"""
Helper method to query phab via requests and return json
"""
data = flatten_for_post(data)
data['api.token'] = self.conduit_token
r = requests.post(
os.path.join(self.phab_url, method),
data=data)
r.raise_for_status()
return r.json()
def find_uris(self, after=None):
"""
Get a set of uris
"""
data = {
"queryKey": "all",
"attachments": {
'uris': '1',
},
}
if after:
data['after'] = after
repos = self._query_phab(
'diffusion.repository.search',
data
)
results = repos['result']
if not results:
print(json.dumps(URIS))
return
for repo in results['data']:
for uri in repo['attachments']['uris']['uris']:
# Looking only for uris that are pointing to gerrit
# that are also of io type "observe"
phid = uri['id']
name = uri['fields']['uri']['raw']
io = uri['fields']['io']['raw']
if 'gerrit' in name and io == 'observe':
URIS[phid] = name
after = results['cursor']['after']
if after:
self.find_uris(after)
else:
print(json.dumps(URIS))
def main():
p = Phab()
p.find_uris()
if __name__ == '__main__':
main()

Event Timeline

thcipriani edited the content of this paste. (Show Details)