Page MenuHomePhabricator
Paste P8274

kafkapurge.go
ActivePublic

Authored by ema on Mar 26 2019, 5:27 PM.
Tags
None
Referenced Files
F28473070: raw.txt
Mar 26 2019, 5:28 PM
F28473060: raw.txt
Mar 26 2019, 5:27 PM
Subscribers
None
package main
import (
"encoding/json"
"flag"
"log"
"net/http"
"net/url"
"os"
"strings"
"github.com/Shopify/sarama"
)
/*
{
"database": "ruwiki",
"delay_until": "1553618514",
"mediawiki_signature": "d272bee9c63233dbc695673c0e0974dfcfb694748c65124f912000f2c3cf9808",
"meta": {
"domain": "ru.wikipedia.org",
"dt": "2019-03-26T16:41:44+00:00",
"id": "09faa33f-4fe6-11e9-80c2-141877613814",
"request_id": "XJpWRwpAADwAADioN7IAAAAM",
"schema_uri": "mediawiki/job/2",
"topic": "mediawiki.job.cdnPurge",
"uri": "https://ru.wikipedia.org/wiki/%D0%A1%D0%BB%D1%83%D0%B6%D0%B5%D0%B1%D0%BD%D0%B0%D1%8F:Badtitle/CdnCacheUpdate"
},
"page_namespace": -1,
"page_title": "Служебная:Badtitle/CdnCacheUpdate",
"params": {
"jobReleaseTimestamp": 1553618514,
"requestId": "XJpWRwpAADwAADioN7IAAAAM",
"urls": [
"https://ru.wikipedia.org/wiki/%D0%AD%D0%BC%D0%B8%D0%BD%D0%B5%D0%BC",
"https://ru.wikipedia.org/w/index.php?title=%D0%AD%D0%BC%D0%B8%D0%BD%D0%B5%D0%BC&action=history",
"https://ru.m.wikipedia.org/wiki/%D0%AD%D0%BC%D0%B8%D0%BD%D0%B5%D0%BC",
"https://ru.m.wikipedia.org/w/index.php?title=%D0%AD%D0%BC%D0%B8%D0%BD%D0%B5%D0%BC&action=history"
]
},
"type": "cdnPurge"
}
*/
type Purge struct {
Database string
Params Params
}
type Params struct {
Urls []string
}
var (
brokerList = flag.String("brokers", "", "REQUIRED: comma separated list of brokers in the Kafka cluster")
topic = flag.String("topic", "", "REQUIRED: topic to consume")
frontendURL = flag.String("frontend_url", "http://127.0.0.1:80", "Cache frontend URL")
backendURL = flag.String("backend_url", "http://127.0.0.1:3128", "Cache backend URL")
logger = log.New(os.Stderr, "", log.LstdFlags)
client = &http.Client{}
)
func sendPurge(baseUrl, path, host string) error {
rawurl := baseUrl + path
req, err := http.NewRequest("PURGE", rawurl, nil)
if err != nil {
log.Printf("Failed creating request for url %s\n", rawurl)
return err
}
req.Header.Add("Host", host)
_, err = client.Do(req)
if err != nil {
log.Printf("Failed sending request to Host: %s -> %s: %s\n", host, rawurl, err)
}
//log.Println(resp)
return err
}
func process(b []byte) error {
var p Purge
err := json.Unmarshal(b, &p)
if err != nil {
log.Printf("Failed unmarshalling %s: %s\n", b, err)
return err
}
// log.Println(p.Database)
for _, rawurl := range p.Params.Urls {
parsedurl, err := url.Parse(rawurl)
if err != nil {
// Log an error message and continue with other urls
log.Printf("Failed parsing url %s\n", rawurl)
continue
}
go sendPurge(*frontendURL, parsedurl.Path, parsedurl.Host)
go sendPurge(*backendURL, parsedurl.Path, parsedurl.Host)
}
return nil
}
func main() {
flag.Parse()
if *brokerList == "" {
log.Fatal("You have to provide -brokers as a comma-separated list")
}
if *topic == "" {
log.Fatal("You have to provide -topic")
}
initialOffset := sarama.OffsetNewest
config := sarama.NewConfig()
config.Net.TLS.Enable = false
c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
if err != nil {
log.Fatalf("Failed to start consumer with brokers %s: %s", *brokerList, err)
}
defer func() {
log.Println("Closing consumer")
err := c.Close()
if err != nil {
log.Fatalf("Failed to close consumer: %s", err)
}
}()
p, err := c.Partitions(*topic)
if err != nil {
log.Fatalf("Failed to get partitions for topic %s:%s", *topic, err)
}
for _, partition := range p {
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
if err != nil {
log.Fatalf("Failed to start consumer for partition %d: %s", partition, err)
}
for msg := range pc.Messages() {
//log.Printf("Partition:\t%d\n", msg.Partition)
//log.Printf("Offset:\t%d\n", msg.Offset)
//log.Printf("Key:\t%s\n", string(msg.Key))
//log.Printf("Value:\t%s\n", string(msg.Value))
//log.Println()
go process(msg.Value)
}
}
}

Event Timeline

ema edited the content of this paste. (Show Details)