Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Paste
P10559
phaburis.py
Active
Public
Actions
Authored by
thcipriani
on Feb 28 2020, 5:57 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
None
Referenced Files
F31648829: raw.txt
Feb 28 2020, 5:57 PM
2020-02-28 17:57:52 (UTC+0)
Subscribers
None
import
json
import
os
import
re
import
time
import
sys
import
requests
URIS
=
{}
def
flatten_for_post
(
h
,
result
=
None
,
kk
=
None
):
"""
Since phab expects x-url-encoded form post data (meaning each
individual list element is named). AND because, evidently, requests
can't do this for me, I found a solution via stackoverflow.
See also:
<https://secure.phabricator.com/T12447>
<https://stackoverflow.com/questions/26266664/requests-form-urlencoded-data/36411923>
"""
if
result
is
None
:
result
=
{}
if
isinstance
(
h
,
str
)
or
isinstance
(
h
,
bool
)
or
isinstance
(
h
,
unicode
):
result
[
kk
]
=
h
elif
isinstance
(
h
,
list
)
or
isinstance
(
h
,
tuple
):
for
i
,
v1
in
enumerate
(
h
):
flatten_for_post
(
v1
,
result
,
'
%s
[
%d
]'
%
(
kk
,
i
))
elif
isinstance
(
h
,
dict
):
for
(
k
,
v
)
in
h
.
items
():
key
=
k
if
kk
is
None
else
"
%s
[
%s
]"
%
(
kk
,
k
)
if
isinstance
(
v
,
dict
):
for
i
,
v1
in
v
.
items
():
flatten_for_post
(
v1
,
result
,
'
%s
[
%s
]'
%
(
key
,
i
))
else
:
flatten_for_post
(
v
,
result
,
key
)
return
result
def
replace_domain
(
uri
):
return
re
.
sub
(
'^https://gerrit-slave.wikimedia.org/r/'
,
'https://gerrit-replica.wikimedia.org/r/'
,
uri
)
class
Phab
(
object
):
def
__init__
(
self
):
self
.
phab_url
=
'https://phabricator.wikimedia.org/api/'
self
.
conduit_token
=
self
.
_get_token
()
def
_get_token
(
self
):
"""
Use the $CONDUIT_TOKEN envvar, fallback to whatever is in ~/.arcrc
"""
token
=
None
token_path
=
os
.
path
.
expanduser
(
'~/.arcrc'
)
if
os
.
path
.
exists
(
token_path
):
with
open
(
token_path
)
as
f
:
arcrc
=
json
.
load
(
f
)
token
=
arcrc
[
'hosts'
][
self
.
phab_url
][
'token'
]
return
os
.
environ
.
get
(
'CONDUIT_TOKEN'
,
token
)
def
_query_phab
(
self
,
method
,
data
):
"""
Helper method to query phab via requests and return json
"""
data
=
flatten_for_post
(
data
)
data
[
'api.token'
]
=
self
.
conduit_token
r
=
requests
.
post
(
os
.
path
.
join
(
self
.
phab_url
,
method
),
data
=
data
)
r
.
raise_for_status
()
return
r
.
json
()
def
find_uris
(
self
,
after
=
None
):
"""
Get a set of authors from a list of commit sha1s
"""
data
=
{
# "queryKey": "all",
"queryKey"
:
"active"
,
"attachments"
:
{
'uris'
:
'1'
,
},
}
if
after
:
data
[
'after'
]
=
after
repos
=
self
.
_query_phab
(
'diffusion.repository.search'
,
data
)
results
=
repos
[
'result'
]
if
not
results
:
return
for
repo
in
results
[
'data'
]:
for
uri
in
repo
[
'attachments'
][
'uris'
][
'uris'
]:
# Looking only for uris that are pointing to gerrit
# that are also of io type "observe"
phid
=
uri
[
'id'
]
name
=
uri
[
'fields'
][
'uri'
][
'raw'
]
io
=
uri
[
'fields'
][
'io'
][
'raw'
]
if
'gerrit-slave'
in
name
and
io
==
'observe'
:
URIS
[
phid
]
=
name
after
=
results
[
'cursor'
][
'after'
]
if
after
:
self
.
find_uris
(
after
)
else
:
return
def
update_uri
(
self
,
phid
,
uri
):
method
=
'diffusion.uri.edit'
data
=
{
'objectIdentifier'
:
str
(
phid
),
'transactions'
:
[{
'type'
:
'uri'
,
'value'
:
str
(
uri
)
}]
}
print
(
flatten_for_post
(
data
))
# return {'error_code': None}
return
self
.
_query_phab
(
method
,
data
)
def
main
():
p
=
Phab
()
phab_uri_path
=
'phaburis.json'
if
not
os
.
path
.
exists
(
phab_uri_path
):
p
.
find_uris
()
with
open
(
phab_uri_path
,
'w'
)
as
f
:
f
.
write
(
json
.
dumps
(
URIS
))
with
open
(
phab_uri_path
)
as
f
:
uris
=
json
.
loads
(
f
.
read
())
count
=
0
for
phid
,
uri
in
uris
.
items
():
result
=
p
.
update_uri
(
phid
,
replace_domain
(
uri
))
if
result
[
'error_code'
]:
print
(
'OOOOHHHH NOOOO'
)
print
(
result
)
sys
.
exit
(
1
)
count
+=
1
if
count
>=
50
:
count
=
0
time
.
sleep
(
5
)
if
__name__
==
'__main__'
:
main
()
Event Timeline
thcipriani
created this paste.
Feb 28 2020, 5:57 PM
2020-02-28 17:57:52 (UTC+0)
Log In to Comment