Page MenuHomePhabricator
Paste P6650

kafkapurge.rs
ActivePublic

Authored by ema on Feb 1 2018, 3:45 PM.
Tags
None
Referenced Files
F12940174: kafkapurge.rs
Feb 1 2018, 3:45 PM
Subscribers
None
extern crate kafka;
use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};
use std::str;
extern crate serde_json;
use serde_json::{Value};
extern crate url;
use url::{Url};
extern crate reqwest;
use reqwest::header::{Host, UserAgent, Headers};
static KAFKA_HOST: &str = "kafka1001.eqiad.wmnet:9092";
static KAFKA_TOPIC: &str = "eqiad.mediawiki.job.cdnPurge";
static FE_BASE_URL: &str = "http://127.0.0.1:80";
static BE_BASE_URL: &str = "http://127.0.0.1:3128";
static USER_AGENT: &str = "kafkapurge";
// Send a PURGE request to the given *url*, setting the Host header to the value specified in the
// *host* parameter. Only produce output if something went wrong.
fn send_purge(host: std::option::Option<&str>, url: &str) {
let client = reqwest::Client::new();
let purge = reqwest::Method::Extension("PURGE".to_owned());
let mut headers = Headers::new();
headers.set(Host::new(host.unwrap_or_default().to_string(), None));
headers.set(UserAgent::new(USER_AGENT));
let resp = client.request(purge.clone(), url)
.headers(headers.clone()).send();
match resp {
Ok(r) => {
if r.status() != reqwest::StatusCode::NoContent {
println!("Unexpected Status {:?}", r.status());
}
},
Err(e) => println!("Error sending purge, url={}, err={:?}", url, e),
}
}
// Given a URL to be purged, add the URL path to the Varnish Frontend/Backend base URLs, extract
// the Host header, and call send_purge.
fn purge(url: &str) {
let mut be_url = Url::parse(BE_BASE_URL).unwrap();
let mut fe_url = Url::parse(FE_BASE_URL).unwrap();
// Parse the URL to be PURGEd
let parsed_url = Url::parse(url).unwrap();
// Get the host part of the URL
let host = parsed_url.host_str();
// Add path and query to the Backend URL
be_url.set_path(parsed_url.path());
be_url.set_query(parsed_url.query());
// Add path and query to the Frontend URL
fe_url.set_path(parsed_url.path());
fe_url.set_query(parsed_url.query());
// Send PURGE request to the local Varnish backend first
send_purge(host, be_url.as_str());
// Then send it to the local Varnish frontend
send_purge(host, fe_url.as_str());
}
// Parse Kafka JSON message and call purge for each URL.
//
// Example:
// {
// "database": "enwiki",
// [...]
// "params": {
// "jobReleaseTimestamp": 1517499287,
// "requestId": "WnMziwpAEDEAAD5PGYcAAAAA",
// "urls": ["https://en.wikipedia.org/wiki/Luis_Alberto_(footballer,_born_1992)",
// "https://en.wikipedia.org/w/index.php?title=Luis_Alberto_(footballer,_born_1992)&action=history",
// "https://en.m.wikipedia.org/wiki/Luis_Alberto_(footballer,_born_1992)",
// "https://en.m.wikipedia.org/w/index.php?title=Luis_Alberto_(footballer,_born_1992)&action=history"]
// },
// "type": "cdnPurge"
// }
fn process(json_string: &str) {
let v: Value = serde_json::from_str(json_string).unwrap();
for url in v["params"]["urls"].as_array().unwrap().iter() {
purge(url.as_str().unwrap());
}
}
// Call process() for each message read from Kafka
fn main() {
let mut consumer =
Consumer::from_hosts(vec!(KAFKA_HOST.to_owned()))
.with_topic(KAFKA_TOPIC.to_owned())
.with_fallback_offset(FetchOffset::Latest)
.with_offset_storage(GroupOffsetStorage::Kafka)
.create()
.unwrap();
loop {
for ms in consumer.poll().unwrap().iter() {
for m in ms.messages() {
match str::from_utf8(m.value) {
Ok(s) => process(s),
Err(e) => println!("Error converting string {} from UTF8. Continuing.", e),
}
}
let m = consumer.consume_messageset(ms);
if m.is_err() {
println!("Error consuming message: {:?}", m);
}
}
consumer.commit_consumed().unwrap();
}
}