Page MenuHomePhabricator

Collect netflow data for internal traffic
Open, MediumPublic

Description

This would be really nice for observability of internal traffic.

Some open questions here:

  • are the netflow boxes and also the Analytics pipelines involved going to be okay if we are sending a great number of more flows?
  • can we get hostnames and/or cluster names rather than just IP addresses?
  • are there any special worries around addressing of k8s pods? (we might need to map them from a pod IP --> a service name at flow creation time?)

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes
CDanis triaged this task as Medium priority.Sep 18 2020, 6:08 PM

are the netflow boxes and also the Analytics pipelines involved going to be okay if we are sending a great number of more flows?

Do we have a high level estimate of what will happen to the following traffic graph? (netflow -> kafka jumbo)

https://grafana.wikimedia.org/d/n3yYx5OGz/kafka-by-topic?orgId=1&from=now-24h&to=now&refresh=5m&var-datasource=eqiad%20prometheus%2Fops&var-kafka_cluster=jumbo-eqiad&var-kafka_broker=All&var-topic=netflow

The only things that comes up to mind are:

  1. The number of kafka jumbo partitions might need to be bumped to 6 before we enable the extra traffic to spread more the load. This would also help in fetching more data in parallel from kafka to hdfs via camus (more mappers for every data fetch).
  2. More data needs to be kept in hive/hdfs, we are currently storing this for raw and refined data:
elukey@stat1004:~$ hdfs dfs -du -h /wmf/data/raw/netflow/netflow/hourly/2020
353.3 G  1.0 T    /wmf/data/raw/netflow/netflow/hourly/2020/01
300.0 G  899.9 G  /wmf/data/raw/netflow/netflow/hourly/2020/02
236.6 G  709.9 G  /wmf/data/raw/netflow/netflow/hourly/2020/03
611.8 G  1.8 T    /wmf/data/raw/netflow/netflow/hourly/2020/04
600.4 G  1.8 T    /wmf/data/raw/netflow/netflow/hourly/2020/05
535.1 G  1.6 T    /wmf/data/raw/netflow/netflow/hourly/2020/06
617.9 G  1.8 T    /wmf/data/raw/netflow/netflow/hourly/2020/07
621.1 G  1.8 T    /wmf/data/raw/netflow/netflow/hourly/2020/08
429.2 G  1.3 T    /wmf/data/raw/netflow/netflow/hourly/2020/09

elukey@stat1004:/mnt/hdfs/wmf/data/wmf/netflow/year=2020/month=9$ hdfs dfs -du -h /wmf/data/wmf/netflow/year=2020/
164.4 G  493.3 G  /wmf/data/wmf/netflow/year=2020/month=1
142.3 G  426.8 G  /wmf/data/wmf/netflow/year=2020/month=2
113.9 G  341.8 G  /wmf/data/wmf/netflow/year=2020/month=3
280.4 G  841.1 G  /wmf/data/wmf/netflow/year=2020/month=4
275.9 G  827.6 G  /wmf/data/wmf/netflow/year=2020/month=5
245.7 G  737.0 G  /wmf/data/wmf/netflow/year=2020/month=6
286.2 G  858.6 G  /wmf/data/wmf/netflow/year=2020/month=7
287.4 G  862.1 G  /wmf/data/wmf/netflow/year=2020/month=8
197.1 G  591.4 G  /wmf/data/wmf/netflow/year=2020/month=9

The sizes are two since the latter is the final replicated (3 times) data usage on HDFS. There shouldn't be a lot of trouble in storing more data but better to know more or less how much :)

  1. Bigger segments stored on HDFS for Druid, and also on Druid itself. We should be ok but I'd need to double check with @JAllemandou first (this also depends on how much more data we'll store etc..). Current total usage on HDFS:
elukey@stat1004:~$ hdfs dfs -du -h -s /user/druid/deep-storage/wmf_netflow
3.6 T  10.7 T  /user/druid/deep-storage/wmf_netflow

Some thoughts/idea:

  • enable IPFIX on all/most of the routers interfaces

In the current state of our setup this means double/triple accounting a flow as packets cross interfaces. Juniper's IPFIX implementation is not flexible enough to send different flows to different collectors, at different sampling rates for example. The only granularity is what to sample, based in Juniper's firewall filters.
It's very easy to turn on, but will require more work on the colector/analytics side to add the port (interface) information to the, from the SNMP_INTERFACE_ID present in the IPFIX packet to something human readable, and ideally that we can group (eg. all the transit ports, all the peering ports).

  • set a different sampling pipeline for internal flows

For example by sampling on the switches using sFlow or on the hosts themselves using ipt-netflow
The advantage is that it would not impact the now critical IPFIX/Netflow setup. As in less availability requirement, more flexibility (different sampling rate, etc).
Downside is that we would not be able to get the exact answer of "what is crossing that specific transport link?"

Once we decide on the best way to proceed, we can give a number to @elukey

I wonder as what kind of ASN would these flows show up as (esp. with confederations!), as well as whether we could have a dimension to be able to differentiate between internet traffic, and backhaul traffic. We'd also need a dimension of "site" to be able to filter or slice for traffic from esams to eqiad like the parent task required, right? Also see T254332, which also makes me wonder whether adding all of these different dimensions is going to start being a problem :)

The per-ASN views we have right now for front-facing traffic are priceless, and it would be a pity to make navigating these more difficult (by e.g. having to select interface IDs for all transit interfaces).

The per-ASN views we have right now for front-facing traffic are priceless, and it would be a pity to make navigating these more difficult (by e.g. having to select interface IDs for all transit interfaces).

Perhaps we could use/extend the existing 'direction' dimension to keep things simple here?

Right now it's just incoming/outgoing/unknown; internal traffic should at least be 'unknown'.

BBlack added a subscriber: BBlack.

The swap of Traffic for Traffic-Icebox in this ticket's set of tags was based on a bulk action for all tickets that aren't are neither part of our current planned work nor clearly a recent, higher-priority emergent issue. This is simply one step in a larger task cleanup effort. Further triage of these tickets (and especially, organizing future potential project ideas from them into a new medium) will occur afterwards! For more detail, have a look at the extended explanation on the main page of Traffic-Icebox . Thank you!

Change 742110 had a related patch set uploaded (by Ayounsi; author: Ayounsi):

[operations/puppet@production] Pmacct add sflow listener

https://gerrit.wikimedia.org/r/742110

I went the "set a different sampling pipeline for internal flows" way with the above POC for the reasons mentioned in T263277#6491140.

@CDanis, @cmooney, let me know what you think.

If you agree, the next steps are:

  1. Analytics to create a sflow pipeline, similar to the netflow one
    • It will be testing only at first, so very little traffic,
    • Maybe named "internal_flows" instead of "sflow" if we want to bikeshed :)
  2. Configure the drmrs switches to send sflow data and test/tune it from there
    • Example unknown to test: can sflow be sent efficiently through the mgmt interface or do they need L3 interfaces?
  3. Write a pre_tag_label and filter to:
    • Add a site field based on the source or destination of the flow
    • Filter out Internet traffic (already sampled in Netflow)
  4. Homerify the switch sFlow config
  5. Once production ready:
    • Scale up progressively by adjusting the sampling rate and location
      • For example access ports vs. uplinks
    • Overall I'm expecting less data sent to Kafka than its Netflow counterpart, with much less cardinality (right now 6 fields, vs. 20 for netflow).

Seems like a sane proposal. The use of sflow and a different pipeline will keep a clean separation between it and data from our internet edge.

In regard to point 3 I'd wonder how we can do that efficiently? Like for instance traffic from an LVS to a back-end server will have a public IP, but this isn't "internet traffic"? Or private traffic between rows traverses the swtich -> CR interfaces. Which are the two ways that jump out to me to isolate internet flows. I'm sure there are other approaches though.

The downside is the extra workload/complexity of using two protocols and thus maintaining two data pipelines. But overall I think it would be manageable so no objection here.

Tagging Data-Engineering because we will likely be managing the Gobblin and/or Druid ingestion parts of this pipeline.
Should we be aiming for real-time ingestion into Druid, as per the wmf_netflow data source?

@BTullis thanks! Real-time, would be a nice plus, but a hard requirement (unlike netflow).

@cmooney
Here is my (untested) solution.
I'm then ignoring all traffic tagged with 1 which matches Internet traffic.
It seems like an acceptable tradeoff between not overloading Analytics with somewhat duplicated traffic, and loosing a bit of internal visibility on those flows.
To keep in mind that the main goal here is to add visibility on internal flows (eg. cross DC, service to service, etc).

Adding some Data-Engineering considerations, with the assumption that the new sflow stream would be comparable to the existing netflow one:

  • It should not be problematic for kafka-jumbo: netflow currently peaks at less than 1Mbs over a total of less than 45Mbs, and machines are not suffering (https://grafana.wikimedia.org/d/000000027/kafka?orgId=1&from=now-7d&to=now). Flagging @Ottomata here for confirmation.
  • It will not be problematic for HDFS storage: netflow unsanitized weights 1.5Tb, sanitized weights 1.8Tb and druid storage weights 15Tb (will be less after T296207 has been resolved). HDFS currently has ~400Tb available. Note: talking without replication factor, meaning all numbers should be multiplied by 3 to have actual values - for instance there is ~1.2Pb available on the cluster now).
  • It is also feasible for Druid, but with less storage room - netflow currently weights 2Tb useful in Druid with rep-factor 2, so 4Tb total. The druid cluster has a total capacity of 13.75Tb, and is currently 45% full. We can therefore add the sflow datasource, but it'll make the cluster busier. Something to note: the sources could be configured to keep less data if needed :)
  • In term of pipelines, adding a new stream means adding some new pipeline, but this is cheap (tooling ready).

@JAllemandou This is great, thanks! Note that we can tune sampling to adapt.

What would be the next steps?

@BTullis thanks! Real-time, would be a nice plus, but a hard requirement (unlike netflow).

Did you mean _not_ a hard requirement?

@JAllemandou, confirmed that resource consumption of this is fine.

Did you mean _not_ a hard requirement?

Yep, my bad :)

What would be the next steps?

Here is a proposal:

  1. [DE, SRE]Agree on the name of the flow :) Will it be sflow (this feels not very explicit for us non-nwetwork people) - Maybe internal_netflow? Something else?
  2. [SRE] Start sending events to kafka with low sampling (small number of events) to facilitate setting the pipeline (easier to test with some events flowing in)
  3. [DE] Setup the pipelines to 1) ingest the raw data in Hadoop and 2) refine/enhance it (I assume the would some data enhancement you'll be after)
  4. [DE, SRE] Send data to Druid and verify it is to the level of expectation for usage
  5. [SRE] Raise sampling level to expected volume and grow the number of kafka topic-partitions to match expected volume.

Sounds good!

  1. we can use "internal_flows" (not _netflow as netflow is a protocol).
  2. can I start this anytime, or we need to create the kafka topic somewhere?

Agree on the name of the flow :

Some guidelines: https://wikitech.wikimedia.org/wiki/Event_Platform/Schemas/Guidelines#Event_Data_Modeling_and_Schema_Naming

Is the schema the same as netflow? Or different? Perhaps getting a schema together would help with the naming. Can we namespace it in a way? For the stream, maybe network.traffic.trace? Or something like that? (I do not really know what is in this data so please make other suggestions :) )

Sounds good!

  1. we can use "internal_flows" (not _netflow as netflow is a protocol).

Would internal_network_flows be ok for you? It makes it even more explicit :)

  1. can I start this anytime, or we need to create the kafka topic somewhere?

I think you can (but I'd rather have @Ottomata confirm this) if you set a very low sampling rate and the same config you're using for netfow (with different kafka topic name of course) so that events are sent to kafka-jumbo. I'll add a note to the list of things to do to grow the number of partitions for the topic before you grow the sample rate.

can I start this anytime, or we need to create the kafka topic somewhere?

Not really needed, unless you need to set special topic settings (like the # of partitions). The topic will be auto created the first time it is produced to.

...do you think we could make this new stream (and maybe netflow as well?) work like the rest of the Event Platform streams (you can still produce directly to Kafka). For this we'd need:

And that's basically it. Doing that would allow us remove a special cased refine job that we maintain just for netflow.

OHhhhh, but right maybe we don't have control of the producer logic that sends the event? We did have to solve that for network error logging. We made a special eventgate point that would allow us to put some of the fields we need in the HTTP URL we requested, so that EventGate could augment the event. Where does sflow come from? Does it have to send via Kafka or can it HTTP POST to eventgate?

internal_network_flows works, network.flows.internal too.

@Ottomata indeed we do have restriction on the producer side (it's the same tool as netflow, and can't HTTP POST) see T248865#6011043.

Looks like there was some naming suggestions over there too.

I'm a bit confused so please let me know what I should do next :)

In case it helps, I came across this abandoned change from 2020: https://gerrit.wikimedia.org/r/c/schemas/event/secondary/+/608077/ which, if I understand it correctly, was an attempt to add an Eventgate compatible schema for netflow.
The ticket to which it related was this T248865: Automate ingestion of netflow event stream

My understanding is that since we couldn't use HTTP POST from pmacct, we were required to bypass Eventgate for the netflow stream data and simply store the output from pmacct in Kafka directly.
It's read directly by the realtime Druid ingestion processes, then a scheduled refine task adds more detail (including GeoIP data) retrospectively. Is this right so far?

Given that we're still using pmacct for the sflow data, I think that it's likely that we will need to replicate the same pattern for this new pipeline. Naturally we wouldn't need any GeoIP for this refinement step, given that all IPs will either be internal or will belong to us. I'm not sure what other information is added as part of the refinement of the netflow data, but I can look it up.

Ah, right!

https://phabricator.wikimedia.org/T248865#6289287

So yeah, unless we can at least control the event format, we can't really make a event platform compatible event.

we're still using pmacct for the sflow data

Okay, we should be able to use the same refine job for the new stuff. Let's call the stream something similar, and then configure Gobblin and Refine netflow jobs to import then new topic along with the existent netflow topic.

Since the existent topic is called netflow, then indeed, perhaps netflow_internal is okay. If we ever bring this over to Event Platform stuff, we can bikeshed the stream names then.

Change 742110 merged by Ayounsi:

[operations/puppet@production] Pmacct add sflow listener

https://gerrit.wikimedia.org/r/742110

Change 747067 had a related patch set uploaded (by Ayounsi; author: Ayounsi):

[operations/puppet@production] \"Pmacct add sflow listener\" try #2

https://gerrit.wikimedia.org/r/747067

Change 747067 merged by Ayounsi:

[operations/puppet@production] \"Pmacct add sflow listener\" try #2

https://gerrit.wikimedia.org/r/747067

Tests are successful:

I tested it by configuring sflow on the non-yet-prod asw1-b12-drmrs switch:

[edit protocols]
+   sflow {
+       sample-rate {
+           ingress 10;
+           egress 10;
+       }
+       source-ip 185.15.58.131;
+       collector 10.128.0.13 {
+           udp-port 6343;
+       }
+       interfaces xe-0/0/45.0;
+       interfaces xe-0/0/46.0;
+       interfaces xe-0/0/47.0;
+   }

And the data showed up in Kafka as expected. For example::

netflow4002:~$ kafkacat -b kafka-jumbo1001.eqiad.wmnet -t network_flows_internal -C
{"event_type": "purge", "peer_ip_src": "10.136.128.3", "ip_src": "185.15.58.130", "ip_dst": "208.80.154.88", "port_src": 161, "port_dst": 48911, "ip_proto": "udp", "stamp_inserted": "2021-12-14 14:06:00", "stamp_updated": "2021-12-14 14:07:01", "packets": 10, "bytes": 9510, "writer_id": "default_kafka/58314"}
{"event_type": "purge", "peer_ip_src": "10.136.128.3", "ip_src": "185.15.58.130", "ip_dst": "208.80.154.5", "port_src": 514, "port_dst": 514, "ip_proto": "udp", "stamp_inserted": "2021-12-14 14:06:00", "stamp_updated": "2021-12-14 14:07:01", "packets": 20, "bytes": 3420, "writer_id": "default_kafka/58314"}

The switch config has been rolled back for now.

Am I right in assuming that this data has the same schema as the original netflow?

Am I right in assuming that this data has the same schema as the original netflow?

This will have less fields, but the ones present have the same type of values than the original netflow (all fields are in the example above). Let me know if I should detail them.

It could be enriched the same way as netflow if possible, for example adding an address_family field based on the ip_src or ip_dst field, like we did in T254332 (GeoIP is not needed though).

No need to detail the fields and schema :)

About data augmentation, here in github is the list of fields we add to netflow with a brief description of how we do it. It'd be rgeat if you could validate the one you think are not needed.
Thanks :)

Cool, only ip_version and region are useful here.

Would it hurt to keep the same augmentations? If the schema is the sameish (it sounds like it is), we can just apply the exact same pipeline on netflow internal with no code changes.

Would it hurt to keep the same augmentations? If the schema is the sameish (it sounds like it is), we can just apply the exact same pipeline on netflow internal with no code changes.

One downside is doing computation for nothing: looking into maxmind ISP DB for internal IPs.
The other is using non-existing fields: net_src, mask_src as well as their dst versions (concatenated).

I'd prefer to avoid scheduling another special job for this if we can. Can we make the NetflowTransform functions smart enough to know to do the right thing based on the input data somehow?

I have the opposite view: I'd rather have another job instead of custom logic to prevent doing something :)

Yeahh...but then we have to manage and maintain another custom ingestion job. We're trying to reduce the number of those.

The custom logic could even just be varied on the hardcoded stream / tablename. If netflow do X, if netflow_internal do Y.

The custom logic could even just be varied on the hardcoded stream / tablename. If netflow do X, if netflow_internal do Y.

Ok let's do it this way :)

Change 747561 had a related patch set uploaded (by Joal; author: Joal):

[analytics/refinery/source@master] Update refine netflow_augment transform function

https://gerrit.wikimedia.org/r/747561

Change 748097 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Add network_internal_flows to refine and druid-load

https://gerrit.wikimedia.org/r/748097

Change 748099 had a related patch set uploaded (by Joal; author: Joal):

[analytics/refinery@master] Add network_internal_flows to gobblin netflow job

https://gerrit.wikimedia.org/r/748099

Adding question here in addition to the CR: For druid ingestion we have 2 jobs, the first ingests all columns, and the second sanitizes by removing the ones with PII data after 90 days.
Do we need a sanitization of the data for network_internal_flows, as it is internal data only? If so, the data as reduced in the CR seems small and possibly of small value.
Let me know what you think :)

I believe that it is not necessary to refine this data.

In theory there should not be any PII data, but it would be safer to sanitize is nonetheless.

As the data is mostly IPs, the easiest is to not keep anything older than 90 days.

Slightly more complex would be to delete (or replace with 0.0.0.0 for v4 :: for v6 or similar) any src_ip or dst_ip that doesn't match any of the prefixes from https://github.com/wikimedia/puppet/blob/production/modules/network/data/data.yaml#L2.

Change 747561 merged by jenkins-bot:

[analytics/refinery/source@master] Update refine netflow_augment transform function

https://gerrit.wikimedia.org/r/747561

Change 748099 merged by Joal:

[analytics/refinery@master] Add network_internal_flows to gobblin netflow job

https://gerrit.wikimedia.org/r/748099

Change 753563 had a related patch set uploaded (by Joal; author: Joal):

[analytics/refinery@master] Correct netflow gobblin job

https://gerrit.wikimedia.org/r/753563

Change 753563 merged by Joal:

[analytics/refinery@master] Correct netflow gobblin job

https://gerrit.wikimedia.org/r/753563

Change 748097 merged by Ottomata:

[operations/puppet@production] Add network_internal_flows to refine and druid-load

https://gerrit.wikimedia.org/r/748097

Change 753794 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Fix error in network_internal_flows druid job

https://gerrit.wikimedia.org/r/753794

Change 753794 merged by Ottomata:

[operations/puppet@production] Fix error in network_internal_flows druid job

https://gerrit.wikimedia.org/r/753794

Change 753818 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Absent network_flows_internal druid jobs

https://gerrit.wikimedia.org/r/753818

Change 753818 merged by Ottomata:

[operations/puppet@production] Absent network_flows_internal druid jobs

https://gerrit.wikimedia.org/r/753818

Change 754994 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Reset druid load jobs for network_flows_internal

https://gerrit.wikimedia.org/r/754994

Change 754994 merged by Ottomata:

[operations/puppet@production] Reset druid load jobs for network_flows_internal

https://gerrit.wikimedia.org/r/754994

Change 755470 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Fix network_flows_internal druid loading jobs

https://gerrit.wikimedia.org/r/755470

Change 755470 merged by Ottomata:

[operations/puppet@production] Fix network_flows_internal druid loading jobs

https://gerrit.wikimedia.org/r/755470

Change 755983 had a related patch set uploaded (by Joal; author: Joal):

[operations/puppet@production] Update network_flows_internal druid indexation job

https://gerrit.wikimedia.org/r/755983

Change 755983 merged by Ottomata:

[operations/puppet@production] Update network_flows_internal druid indexation job

https://gerrit.wikimedia.org/r/755983