Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Paste
P6650
kafkapurge.rs
Active
Public
Actions
Authored by
•
ema
on Feb 1 2018, 3:45 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
None
Referenced Files
F12940174: kafkapurge.rs
Feb 1 2018, 3:45 PM
2018-02-01 15:45:49 (UTC+0)
Subscribers
None
extern
crate
kafka
;
use
kafka
::
consumer
::
{
Consumer
,
FetchOffset
,
GroupOffsetStorage
};
use
std
::
str
;
extern
crate
serde_json
;
use
serde_json
::
{
Value
};
extern
crate
url
;
use
url
::
{
Url
};
extern
crate
reqwest
;
use
reqwest
::
header
::
{
Host
,
UserAgent
,
Headers
};
static
KAFKA_HOST
:
&
str
=
"kafka1001.eqiad.wmnet:9092"
;
static
KAFKA_TOPIC
:
&
str
=
"eqiad.mediawiki.job.cdnPurge"
;
static
FE_BASE_URL
:
&
str
=
"http://127.0.0.1:80"
;
static
BE_BASE_URL
:
&
str
=
"http://127.0.0.1:3128"
;
static
USER_AGENT
:
&
str
=
"kafkapurge"
;
// Send a PURGE request to the given *url*, setting the Host header to the value specified in the
// *host* parameter. Only produce output if something went wrong.
fn
send_purge
(
host
:
std
::
option
::
Option
<&
str
>
,
url
:
&
str
)
{
let
client
=
reqwest
::
Client
::
new
();
let
purge
=
reqwest
::
Method
::
Extension
(
"PURGE"
.
to_owned
());
let
mut
headers
=
Headers
::
new
();
headers
.
set
(
Host
::
new
(
host
.
unwrap_or_default
().
to_string
(),
None
));
headers
.
set
(
UserAgent
::
new
(
USER_AGENT
));
let
resp
=
client
.
request
(
purge
.
clone
(),
url
)
.
headers
(
headers
.
clone
()).
send
();
match
resp
{
Ok
(
r
)
=>
{
if
r
.
status
()
!=
reqwest
::
StatusCode
::
NoContent
{
println
!
(
"Unexpected Status {:?}"
,
r
.
status
());
}
},
Err
(
e
)
=>
println
!
(
"Error sending purge, url={}, err={:?}"
,
url
,
e
),
}
}
// Given a URL to be purged, add the URL path to the Varnish Frontend/Backend base URLs, extract
// the Host header, and call send_purge.
fn
purge
(
url
:
&
str
)
{
let
mut
be_url
=
Url
::
parse
(
BE_BASE_URL
).
unwrap
();
let
mut
fe_url
=
Url
::
parse
(
FE_BASE_URL
).
unwrap
();
// Parse the URL to be PURGEd
let
parsed_url
=
Url
::
parse
(
url
).
unwrap
();
// Get the host part of the URL
let
host
=
parsed_url
.
host_str
();
// Add path and query to the Backend URL
be_url
.
set_path
(
parsed_url
.
path
());
be_url
.
set_query
(
parsed_url
.
query
());
// Add path and query to the Frontend URL
fe_url
.
set_path
(
parsed_url
.
path
());
fe_url
.
set_query
(
parsed_url
.
query
());
// Send PURGE request to the local Varnish backend first
send_purge
(
host
,
be_url
.
as_str
());
// Then send it to the local Varnish frontend
send_purge
(
host
,
fe_url
.
as_str
());
}
// Parse Kafka JSON message and call purge for each URL.
//
// Example:
// {
// "database": "enwiki",
// [...]
// "params": {
// "jobReleaseTimestamp": 1517499287,
// "requestId": "WnMziwpAEDEAAD5PGYcAAAAA",
// "urls": ["https://en.wikipedia.org/wiki/Luis_Alberto_(footballer,_born_1992)",
// "https://en.wikipedia.org/w/index.php?title=Luis_Alberto_(footballer,_born_1992)&action=history",
// "https://en.m.wikipedia.org/wiki/Luis_Alberto_(footballer,_born_1992)",
// "https://en.m.wikipedia.org/w/index.php?title=Luis_Alberto_(footballer,_born_1992)&action=history"]
// },
// "type": "cdnPurge"
// }
fn
process
(
json_string
:
&
str
)
{
let
v
:
Value
=
serde_json
::
from_str
(
json_string
).
unwrap
();
for
url
in
v
[
"params"
][
"urls"
].
as_array
().
unwrap
().
iter
()
{
purge
(
url
.
as_str
().
unwrap
());
}
}
// Call process() for each message read from Kafka
fn
main
()
{
let
mut
consumer
=
Consumer
::
from_hosts
(
vec
!
(
KAFKA_HOST
.
to_owned
()))
.
with_topic
(
KAFKA_TOPIC
.
to_owned
())
.
with_fallback_offset
(
FetchOffset
::
Latest
)
.
with_offset_storage
(
GroupOffsetStorage
::
Kafka
)
.
create
()
.
unwrap
();
loop
{
for
ms
in
consumer
.
poll
().
unwrap
().
iter
()
{
for
m
in
ms
.
messages
()
{
match
str
::
from_utf8
(
m
.
value
)
{
Ok
(
s
)
=>
process
(
s
),
Err
(
e
)
=>
println
!
(
"Error converting string {} from UTF8. Continuing."
,
e
),
}
}
let
m
=
consumer
.
consume_messageset
(
ms
);
if
m
.
is_err
()
{
println
!
(
"Error consuming message: {:?}"
,
m
);
}
}
consumer
.
commit_consumed
().
unwrap
();
}
}
Event Timeline
•
ema
created this paste.
Feb 1 2018, 3:45 PM
2018-02-01 15:45:49 (UTC+0)
Log In to Comment