Page MenuHomePhabricator

Pull netflow data in realtime from Kafka via Tranquillity/Spark
Closed, ResolvedPublic

Description

pmacct is pushing netflow data to the related kafka topic on the Kafka Jumbo cluster (running kafka 0.11). To complete the work, kafka events needs to be pulled via Tranquillity/Spark and inserted in "real-time" to Druid.

After https://gerrit.wikimedia.org/r/#/c/373030/ the job should be relatively easy!

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript

@faidon whenever you have time do you mind to explain a bit what data is currently pushed to the netflow topic in Kafka Jumbo and how to read it? I am planning to work on this task soon (with Joseph's supervision) but I am very ignorant about the subject :)

Had an interesting chat with @ayounsi and for the moment it seems that the only format expected in the netflow topic will be: tag,dst_as,as_path,peer_dst_as

18:38 <XioNoX> tag is an arbitrary value, I'm not even sure we use it
18:40 <XioNoX> Traffic does:  Our AS <-> One of our ISP/peering AS <-> some middleman AS <-> some more middleman AS <-> destination AS
18:40 <XioNoX> the AS# are just integers
18:41 <XioNoX> DST AS is the far end of the chain, the AS# of the network/IP talking to us
18:42 <XioNoX> as_path is the list of AS# from our ISP/peering to the destination AS. It can be as short as 1 AS# or very long
18:43 <XioNoX> peer_dst_as, is the ISP/peering AS, the next hop traffic is going through to reach its final destination

I tried to tail netflow events via kafkacat -C -b kafka-jumbo1006.eqiad.wmnet -t netflow -p0 on stat1004 but I keep getting stuff like:

{"tag": 0, "as_dst": 0, "as_path": "", "peer_as_dst": 0, "stamp_inserted": "2018-01-18 18:43:00", "stamp_updated": "2018-01-18 18:45:02", "packets": 56805000, "bytes": 43872019000}

That doesn't really seem correct, so maybe pmacct is not working as expected yet?

Thanks for working on this task, very much appreciated!

My idea was for "tag" to be used for our different segments of our network, (eqsin/ulsfo/esams/ eqiad+codfw), and have that as an extra dimension in Druid. This would allow us to be able to respond to queries like "what are the top-N ASNs we send traffic to from ulsfo", which is a common query. This isn't set up yet though (tag is always 0), so I'm not yet sure if it would work.

As for those empty-looking flows... I had a look at a tcpdump output for 2100 (the NetFlow port) passed it through Wireshark (decoding port 2100 as "CFLOW"), and they don't seem to be decoded properly. This means that either Wireshark's filter is incompatible (or less likely, broken), or that the flow output from the routers is incomplete.

The flows seem to be coming from esams, not ulsfo, which I don't remember doing (maybe Arzhel did?) and thus I'm not sure if it ever worked from esams. Perhaps the esams routers are misconfigured and send the wrong version of Netflows, or are just buggy? I don't have time to debug further, maybe @ayounsi can? :)

Thanks for the explanation!

I have another question: is netflow data ending up in the neflow kafka topic always with this format? tag,dst_as,as_path,peer_dst_as
If not it might be good to think about it sooner rather than later, ideally it would be great to have one topic for each "format" of netflow data, so importing on Hadoop and Druid would be easier.

Anyway, the overall plan is the following:

  1. Add a camus cron job on analytics1003 to import periodically json data from the netflow kafka topic to HDFS (as we do with webrequest, etc..)
  2. Create a Hive table on top of that data, possibly also aggregating by day and month (as we do with pageviews)
  3. Create a periodical indexing job in Oozie to push data to Druid

In this way we'd have hourly (and possibly daily/monthly) aggregated data for netflow, available in Druid and Hive. On top of that, if needed, we could also add a Spark streaming job to push real time data to Druid via Tranquillity (hence available in pivot too). We are currently supporting only one Streaming use case (Banner Impressions) and up to now everything works fine (we have alarming, auto-restart in case of failures, etc..) but we found out that Druid (at least 0.9.2) is not that tolerant when certain daemons like the overlord and zookeeper are shutdown for maintenance (say reboot of one host). I am saying that because we can definitely support streaming data but we warn people that last hour data could be not super reliable (this is why we'd run periodical batch jobs to fill in the gaps in case something fails).

HMMM. If this is JSON data, and the schema is consistent, we could use JSONRefine to build the table, rather than doing all those Hive table/oozie job steps.

HMMM. If this is JSON data, and the schema is consistent, we could use JSONRefine to build the table, rather than doing all those Hive table/oozie job steps.

+1! Even better :)

Empty logs was due to BGP being disabled between pmacct and the router.

It's now back:

{"tag": 0, "as_dst": 199835, "as_path": "31477_199835", "peer_as_dst": 31477, "stamp_inserted": "2018-01-19 19:29:00", "stamp_updated": "2018-01-19 19:30:03", "packets": 1000, "bytes": 1500000}

Change 406951 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::analytics::refinery::job::camus: add netflow hourly job

https://gerrit.wikimedia.org/r/406951

Change 406951 merged by Elukey:
[operations/puppet@production] profile::analytics::refinery::job::camus: add netflow hourly job

https://gerrit.wikimedia.org/r/406951

Change 408535 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::analytics::refinery::job::json_refine: add netflow job

https://gerrit.wikimedia.org/r/408535

Change 408539 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] camus::netflow: move the base path directory to /wmf/raw

https://gerrit.wikimedia.org/r/408539

Change 408539 merged by Elukey:
[operations/puppet@production] camus::netflow: move the base path directory to /wmf/raw

https://gerrit.wikimedia.org/r/408539

Change 408535 merged by Elukey:
[operations/puppet@production] profile::analytics::refinery::job::json_refine: add netflow job

https://gerrit.wikimedia.org/r/408535

Change 408790 had a related patch set uploaded (by Elukey; owner: Elukey):
[analytics/refinery/source@master] RefineTarget: filter out unneeded inputDatasetPath's directories

https://gerrit.wikimedia.org/r/408790

Change 408790 merged by Elukey:
[analytics/refinery/source@master] Update RefineTarget inputBasePath matches

https://gerrit.wikimedia.org/r/408790

Change 408836 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::analytics::refinery::job::json_refine: standardize netflow conf

https://gerrit.wikimedia.org/r/408836

Change 408836 merged by Elukey:
[operations/puppet@production] profile::analytics::refinery::job::json_refine: standardize netflow conf

https://gerrit.wikimedia.org/r/408836

Change 408981 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] profile::analytics::refinery::job::json_refine: change netflow's db

https://gerrit.wikimedia.org/r/408981

Change 408981 merged by Elukey:
[operations/puppet@production] profile::analytics::refinery::job::json_refine: change netflow's db

https://gerrit.wikimedia.org/r/408981

Finally we have something working! Example from stat1004

elukey@stat1004:~$ hive

[.. som output ..]

hive (default)> use wmf;
OK
hive (wmf)> select as_dst, sum(bytes) as total_bytes from netflow where year=2018 group by as_dst sort by total_bytes desc limit 5;

[.. some output ..]

as_dst	total_bytes

[REDACTED]

Once every hour the json refine Spark job reads new data imported previously by Camus and adds it to Hive. I'll let you guys explore the current data and then we'll decide the next steps on Druid.

Important: when querying with Hive it is really nice to limit the scope of the query as much as possible, to avoid overwhelming the Hadoop cluster. For example, I used "year=2018" only rather than "year=2018 and month=01 and day=01" because the data is still low volume, but please keep this in mind when coming up with queries :)

This is the format of the Hive table:

hive (wmf)> describe netflow;

col_name	        data_type

as_dst              	bigint
as_path             	string
bytes               	bigint
packets             	bigint
peer_as_dst         	bigint
stamp_inserted      	string
stamp_updated       	string
tag                 	bigint
year                	bigint
month               	bigint
day                 	bigint
hour                	bigint

# Partition Information
# col_name            	data_type 

year                	bigint
month               	bigint
day                 	bigint
hour                	bigint

The year/month/day/hour fields are related to how data is partitioned in Hive, following also how it is imported and aggregated by our ETL process (Camus).

Change 411204 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] pmacct: set kafka_patition to -1 on nfacctd.conf

https://gerrit.wikimedia.org/r/411204

Change 411204 merged by Elukey:
[operations/puppet@production] pmacct: set kafka_patition to -1 on nfacctd.conf

https://gerrit.wikimedia.org/r/411204

Change 411207 had a related patch set uploaded (by Elukey; owner: Elukey):
[operations/puppet@production] camus: set mapreduce jobs to 3

https://gerrit.wikimedia.org/r/411207

Change 411207 merged by Elukey:
[operations/puppet@production] camus: set netflow's mapreduce jobs to 3

https://gerrit.wikimedia.org/r/411207

After the last round of patches nfacctd/pmacct are sending events to Kafka using three topic partitions rather than one, and Camus is pulling data from Kafka with 3 map reduce jobs rather than one. This should allow a better throughput when the number of events will increase.

Created a dashboard to see the new Kafka Topics Jumbo (Prometheus) metrics:

https://grafana.wikimedia.org/dashboard/db/kafka-by-topic-prometheus?refresh=5m&orgId=1&from=now-1h&to=now&var-datasource=eqiad%20prometheus%2Fops&var-cluster=kafka_jumbo&var-kafka_broker=All&var-topic=netflow

Are we planing to use tranquility to move the he data into druid or rather just kafka-> camus-> hive?

elukey changed the task status from Open to Stalled.Feb 27 2018, 1:01 PM

Are we planing to use tranquility to move the he data into druid or rather just kafka-> camus-> hive?

Longer term both! Tranquillity + Druid would be useful for real time use cases like response to DDoS, but it is not that pressing. I had a chat with Arzhel and netflow data is currently stopped due to privacy concerns, so I am going to set this task as stalled and remove it from Kanban. I'll start working on it again when Arzhel will be ready to re-enable traffic :)

Going to close this task since the groundwork has been done, and it will restart when netflow data will start flowing again to Kafka. After that me and Arzhel will study/find a solution for real time data :)