Page MenuHomePhabricator

Add a webrequest sampled topic and ingest into druid/turnilo
Closed, ResolvedPublic

Description

This task tracks the implementation of "realtime webrequest" feature.

20220810

Summary/notes of half an hour chat with @BTullis @CDanis @Ottomata and myself:

  • Conceptually "realtime sampled webrequest" is similar to what we're doing with netflow (from a druid/turnilo POV)
  • Having a topic already sampled (and processed/augmented) would make things easy on the druid/turnilo side, and we could drop the current hourly webrequest sampled datasource in turnilo, while keeping the realtime datasource only.
  • Such canonical sampled topic is generally useful to peek for SREs and other users, thus something we'd want

Then the question is, how to generate said sampled topic? Note that webrequest upload and text topics will need to be combined. The basic operations needed are:

  • read from kafka topics
  • sample the streams
  • as the first iteration we'd need basic augmentation for operational investigation purposes (i.e. geoip AS lookup)
  • write back the combined stream back into kafka as the sampled topic

A few options have been discussed, including:

  • Since this is relatively simple stream processing (and quite importantly, stateless) we could get away with simpler/easier solutions like https://www.benthos.dev (deployed in k8s for example)

Event Timeline

Since our meeting, I have been reading the docs around benthos and I've got to say, I find it really compelling!

This looks to me like it could be a very useful kind of swiss-army knife kind of tool that might be applicable more widely and might appeal more broadly to other data engineers and scientists.

For me, one of the beneficial things about it would be the fact that we could begin with unit testing stright away: https://www.benthos.dev/docs/configuration/unit_testing

Therefore, based on this initial reaserch, I'd certainly be in favour of exploring the use of benthos for this requrement.

I agree benthos looks really fun!

I think there is a real need for easy to use stream processors. We evaluated Knative Eventing here, and decided not to pursue it, but recognized that there is a need for something like it. Maybe benthos is that thing?

I would never block anyone from using it, but I'd like if we could build platform support for any chosen solution. We've been focusing on Flink, which allows us to use any of the existing Java tooling we have. It should also allow us to use that same tooling in Python.

Whatever we choose, there's a bunch of functionality we'd want to implement, e.g. event schema lookup and validation (when producing events), integration with Event Stream Config, etc. From a brief look at benthos docs, its not clear to me how to plug that kind of functionality in, but I'm sure it's possible.

I wanted to get a very stupid simple example of using Flink to sample webrequest in Kafka. Here's an example using purely streaming SQL.

  • 'create' the source and sink streaming tables. These only 'create' tables in the local in memory catalog. These 'tables' do not exist anywhere outside of this job. If you restart the job, you need to run these create statements again.
-- Create the source table on the webrequest_text and webrequest_upload topic
CREATE TABLE webrequest (
    `hostname` STRING,
    `sequence` BIGINT,
    `dt` STRING,
    `time_firstbyte` DOUBLE,
    `ip` STRING,
    `cache_status` STRING,
    `http_status` STRING,
    `response_size` BIGINT,
    `http_method` STRING,
    `uri_host` STRING,
    `uri_path` STRING,
    `uri_query` STRING,
    `content_type` STRING,
    `referer` STRING,
    `user_agent` STRING,
    `accept_lanugage` STRING,
    `x_analytics` STRING,
    `range` STRING,
    `x_cache` STRING,
    `accept` STRING,
    `backend` STRING,
    `tls` STRING,
    `ch_ua` STRING,
    `ch_ua_arch` STRING,
    `ch_ua_bitness` STRING,
    `ch_ua_full_version_list` STRING,
    `ch_ua_model` STRING,
    `ch_ua_platform` STRING,
    `ch_ua_platform_version` STRING,
-- Add the kafka topic as a virtual metadata field.  This field is not in the source data, but will show up as a field on the streaming table.
-- See: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata
-- We'll use this to create the 'webrequest_source' field in the sink table.
    `topic` STRING METADATA VIRTUAL
) WITH (
    'connector' = 'kafka',
    'topic' = 'webrequest_text;webrequest_upload',
    'format' = 'json',    
    'json.ignore-parse-errors' = 'true',
    'json.fail-on-missing-field' = 'false',
    'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092',
    'scan.startup.mode' = 'latest-offset',
    'properties.auto.offset.reset' = 'latest',
    'properties.group.id' = 'pyflink_webrequest_consumer_group'
);

-- Create the sink table. Inserting into this table will write to the defined Kafka topic.
CREATE TABLE test_webrequest_sampled_otto0 (
    `hostname` STRING,
    `sequence` BIGINT,
    `dt` STRING,
    `time_firstbyte` DOUBLE,
    `ip` STRING,
    `cache_status` STRING,
    `http_status` STRING,
    `response_size` BIGINT,
    `http_method` STRING,
    `uri_host` STRING,
    `uri_path` STRING,
    `uri_query` STRING,
    `content_type` STRING,
    `referer` STRING,
    `user_agent` STRING,
    `accept_lanugage` STRING,
    `x_analytics` STRING,
    `range` STRING,
    `x_cache` STRING,
    `accept` STRING,
    `backend` STRING,
    `tls` STRING,
    `ch_ua` STRING,
    `ch_ua_arch` STRING,
    `ch_ua_bitness` STRING,
    `ch_ua_full_version_list` STRING,
    `ch_ua_model` STRING,
    `ch_ua_platform` STRING,
    `ch_ua_platform_version` STRING,
-- Add two new computed fields
    `webrequest_source` STRING,
    `x_analytics_map` MAP<STRING, STRING>
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_webrequest_sampled_otto0',
    'format' = 'json',    
    'properties.bootstrap.servers' = 'kafka-jumbo1001.eqiad.wmnet:9092'
);
  • Launch the streaming sql job. Running this will cause messages to be consumed, transformed, and then produced back to Kafka.
-- Insert into the sink table.
-- Table name here doesn't really matter.  The output Kafka topic is defined in the WITH clause.
INSERT INTO test_webrequest_sampled_otto0
    SELECT 
-- Select all of the fields from the source we want.
    `hostname`,
    `sequence`,
    `dt`,
    `time_firstbyte`,
    `ip`,
    `cache_status`,
    `http_status`,
    `response_size`,
    `http_method`,
    `uri_host`,
    `uri_path`,
    `uri_query`,
    `content_type`,
    `referer`,
    `user_agent`,
    `accept_lanugage`,
    `x_analytics`,
    `range`,
    `x_cache`,
    `accept`,
    `backend`,
    `tls`,
    `ch_ua`,
    `ch_ua_arch`,
    `ch_ua_bitness`,
    `ch_ua_full_version_list`,
    `ch_ua_model`,
    `ch_ua_platform`,
    `ch_ua_platform_version`,
-- Use built in Flink SQL functions to add the webrequest_source and x_analytics_map fields.
-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/
    REGEXP_EXTRACT(`topic`, '^webrequest_(.+)$', 1) AS `webrequest_source`,
    STR_TO_MAP(`x_analytics`, ';', '=') AS `x_analytics_map`
-- Select FROM the source table we created above.
    FROM webrequest
-- Naive sample using random
    WHERE RAND_INTEGER(127) = 0;
);

Now, I can see sampled webrequest data on kafka-jumbo cluster in the test_webrequest_sampled_otto0 topic:

$ kafkacat -C -u -b kafka-jumbo1001.eqiad.wmnet:9092 -t test_webrequest_sampled_otto0 -c 2 -o end | jq .

{
  "hostname": "cp3059.esams.wmnet",
  "sequence": 14534715703,
  "dt": "2022-08-16TXX:XX:XXZ",
  "time_firstbyte": 0.000489,
  "ip": "X.X.X.X",
  "cache_status": "hit-front",
  "http_status": "200",
  "response_size": 2178,
  "http_method": "GET",
  "uri_host": "upload.wikimedia.org",
  "uri_path": "/wikipedia/commons/thumb/2/2a/HUN_Order_of_Merit_of_the_HPR_1kl_BAR.png/60px-HUN_Order_of_Merit_of_the_HPR_1kl_BAR.png.webp",
  "uri_query": "",
  "content_type": "image/webp",
  "referer": "https://ru.wikipedia.org/",
  "user_agent": "XXXXXXXX",
  "accept_lanugage": null,
  "x_analytics": "https=1;client_port=xxxxx;nocookies=1",
  "range": "-",
  "x_cache": "cp3059 hit, cp3059 hit/48",
  "accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
  "backend": "ATS/8.0.8",
  "tls": "-",
  "ch_ua": "xxxxxxx",
  "ch_ua_arch": "-",
  "ch_ua_bitness": "-",
  "ch_ua_full_version_list": "-",
  "ch_ua_model": "-",
  "ch_ua_platform": "xxxxxxxx",
  "ch_ua_platform_version": "-",
  "webrequest_source": "upload",
  "x_analytics_map": {
    "client_port": "xxxxx",
    "nocookies": "1",
    "https": "1"
  }
}

...

To add geocoding, etc., you'd probably need a custom UDF. It should be possible to write UDFs that call out to Java code that we use to process webrequest in Hive too. Here's the Hive UDF we use to geocode. It is a Hive specific wrapper around maxmind libs. You probably could write the Flink stuff to call the same code.

You should also be able to write a UDF in pyflink (and run these SQL statements via pyspark too) if you prefer. It should be possible to call out to Java code in pyspark too. Here's an example of doing so. As long as the Java class you want to use is on the classpath, usually added to the flink job with the --jars opt, you should be able to call out to it from pyflink.

Also, this is using the Flink SQL/Table API. It is also possible (and sometimes more direct) to use the lower level DataStream API to do this. wikimedia-event-utilities is getting a bunch of Flink integration with WMF Event Platform, which helps remove boilerplate around all of this. (We'd need T314956: [Event Platform] Declare webrequest as an Event Platform stream to make use of all that though).

Thank you @Ottomata for the example and extensive explanation! I'll take a closer look and play with it a little bit it too

My clinic duty hat would like to triage this task - is "Medium" priority OK @fgiunchedi / @Ottomata ?

fgiunchedi triaged this task as Medium priority.Sep 8 2022, 8:47 AM

I have resumed work on this a little bit and produced a worked example using benthos with sampling and ASN/geoip lookup, the configuration looks like this:

http:
  enabled: true
  address: 0.0.0.0:4195
  root_path: /benthos
  debug_endpoints: true

input:
  label: "webrequest"
  kafka:
    addresses:
      - 'localhost:9092'
    topics:
      - 'webrequest_text'
      - 'webrequest_cache'
    consumer_group: 'benthos-webrequest-realtime'
#    start_from_oldest: false

pipeline:
  processors:
  - label: "drop"
    bloblang: |
      root = if random_int() % 10 != 0 { deleted() }

  -  label: "geoip"
      bloblang: |
      root = this
      root.asn = this.ip.geoip_asn(path: "GeoLite2-ASN.mmdb").AutonomousSystemNumber

output:
  label: "stdout"
  stdout:
    codec: lines
#  kafka:
#    addresses:
#      - 'localhost'
#    topic: 'webrequest-realtime'

metrics:
  prometheus:
    use_histogram_timing: true
    add_process_metrics: true
    add_go_metrics: true

Cool! Is this.ip.geoip_asn built into benthos or did you provide it somehow?

Cool! Is this.ip.geoip_asn built into benthos or did you provide it somehow?

The geoip_asn method is built into benthos (though marked as experimental so there could be breaking changes) whereas the DB is geolite2 free version

Cross posting: T310997#8378791

Me and Filippo are working on a prototype with Benthos :)

Change 854487 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] Add a basic puppetization for Benthos

https://gerrit.wikimedia.org/r/854487

Change 854499 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] centrallog: add first prototype of webrequest-live with Benthos

https://gerrit.wikimedia.org/r/854499

Assumptions/notes for the first prototype:

  • Benthos reads webrequest_{text,upload} with a Kafka consumer group (so multiple nodes can share the load transparently etc..).
  • The idea is to transform/enrich the above streams and publish a new stream to Kafka jumbo called webrequest_128_live.
  • We used this Druid indexing config to decide what fields needed to be added to webrequest_128_live. So far we added:
    • all tls fields, getting the values from the tls string in the webrequest stream (basically splitting all by ; and then by = to get a meaningful dict).
    • client_port, requestctl and is_from_public_cloud reading from the x_analytics string in the webrequest stream (same split + dict idea).
    • Set is_pageview="-" as placeholder, since IIRC we cannot really derive it via Benthos (and also we don't really care about it for this use case right?).
    • geolocation fields using MaxMind.
  • webrequest_source field as well (derived from the topic names).
  • The sampling is the bit that we are not 100% sure about. The initial solution to start the work was to have something like if random_int() % 128 != 0 { deleted() }. In this way Benthos processes an event in the webrequest stream only if the random int can be divided by 128. The idea was to assume a uniform distribution of values and a fair/good random number generator to create a sort of sampling of 1/128th of the traffic volume, but it doesn't seem 100% mathematically sound (good to kick off the initial tests though). An alternative could be to create a counter in Benthos and process only elements at a certain cadence (like when the counter reaches 128, then we can reset it) or to use the Kafka topic offset to establish what events to consider (the offset is a regular integer, we could do $offset % 128 == 0 for example). I'll discuss this with @JAllemandou to figure out what's best.
  • The Benthos config is under review here.

All the above will only produce a new topic called webrequest_128_live, then we'll need something like this to instruct Druid Analytics about what to do.

We decided not to go on Kubernetes for the moment, but to use the two centrallog nodes that the Observability team manages. Long term we'll want k8s but as initial testing it shouldn't be needed.

Let us know if you like the idea and if you have comments :)

happy to talk about sampling when you wish @elukey :)
I also think you should be pointed to T319344, as it requests a new feature.

I finally think the approach relying on stream-only ingestion (as described in the task description) should be reviewed, to add a batch component to it. This would allow rerunning ingestion easily (for issues or new features). Happy to discuss this as well.

happy to talk about sampling when you wish @elukey :)

Ack I'll set up a meeting :)

I also think you should be pointed to T319344, as it requests a new feature.

+1, I think that it should be easily added. Ok if we do it on a later stage?

I finally think the approach relying on stream-only ingestion (as described in the task description) should be reviewed, to add a batch component to it. This would allow rerunning ingestion easily (for issues or new features). Happy to discuss this as well.

The idea that I have in mind is to keep the current batch jobs for webrequest_128, and just rely on this new stream for the "live" data (more or less like we do with netflow). So everything imported/indexed from webrequest_128_live is eventually overridden by the DE batch jobs, does it make sense or am I missing something?

Something I forgot: It'd be great to have the new stream setup as an "event" (whether to send it to eventgate or not is to be evaluated), but at least having defined schema that matches events.

I also think you should be pointed to T319344, as it requests a new feature.

+1, I think that it should be easily added. Ok if we do it on a later stage?

For sure :)

I finally think the approach relying on stream-only ingestion (as described in the task description) should be reviewed, to add a batch component to it. This would allow rerunning ingestion easily (for issues or new features). Happy to discuss this as well.

The idea that I have in mind is to keep the current batch jobs for webrequest_128, and just rely on this new stream for the "live" data (more or less like we do with netflow). So everything imported/indexed from webrequest_128_live is eventually overridden by the DE batch jobs, does it make sense or am I missing something?

There is a link with sampling in here - let's talk about that in the meeting :)

overridden by the DE batch jobs

QQ: do the existent DE batch jobs already produce all the same info you are trying to produce here with benthos?

I wonder if there will be differences in the output, since the IIUC, the processors you are using (sampling, parsing fields, geocoding, etc.) are not the same processor code being used by the batch jobs.

FWIW, this is one of the reasons we are preferring Flink/Spark type streaming systems. You can't run benthos in batch mode for backfilling or bootstrapping or lambda arch.

It'd be great to have the new stream setup as an "event"

+1. T314956: [Event Platform] Declare webrequest as an Event Platform stream. At minimum it must declare a schema and a stream. Ideally, the producer code you are writing would also validate the event before producing it to Kafka. Ideally it also would not produce to topics unless stream config existed and said it could be produced. See also https://wikitech.wikimedia.org/wiki/Event_Platform/Producer_Requirements

overridden by the DE batch jobs

QQ: do the existent DE batch jobs already produce all the same info you are trying to produce here with benthos?

I wonder if there will be differences in the output, since the IIUC, the processors you are using (sampling, parsing fields, geocoding, etc.) are not the same processor code being used by the batch jobs.

In theory yes, all the added fields have the same format afaics from what we add in our batch jobs. It needs to be tested but I am pretty sure it should be ok. The only different part will of course be the sampling, but in my opinion if it is not the same it shouldn't matter a lot, since most of the times what SRE cares about is live data to debug an ongoing event or to just query for historical data.

FWIW, this is one of the reasons we are preferring Flink/Spark type streaming systems. You can't run benthos in batch mode for backfilling or bootstrapping or lambda arch.

Definitely yes, but for this use case we already have the batch/backfilling part to it seemed a nice test to make :)

It'd be great to have the new stream setup as an "event"

+1. T314956: [Event Platform] Declare webrequest as an Event Platform stream. At minimum it must declare a schema and a stream. Ideally, the producer code you are writing would also validate the event before producing it to Kafka. Ideally it also would not produce to topics unless stream config existed and said it could be produced. See also https://wikitech.wikimedia.org/wiki/Event_Platform/Producer_Requirements

The idea is to proceed for a first iteration (namely a new kafka topic + druid indexation on a separate datasource, say webrequest_128_live) without having a proper event and schema in place, so that we could validate if the whole workflow works and if it is valuable for SRE. Then we can definitely add one, what do you think?
If we emit events from Benthos in my opinion we just use the Eventgate HTTP endpoint and offload the validation etc.. to it, it seems the easiest. If anybody wants to create a separate plugin to validate etc.. we'll be available to help :)

The idea is to proceed for a first iteration (namely a new kafka topic + druid indexation on a separate datasource, say webrequest_128_live) without having a proper event and schema in place, so that we could validate if the whole workflow works and if it is valuable for SRE. Then we can definitely add one, what do you think?

Ya sounds good. When you are ready, the minimal requirement of adding the schema and event stream config won't be hard.

If we emit events from Benthos in my opinion we just use the Eventgate HTTP endpoint

Ya that'll work! To do this you need a schema and event stream config :p

Change 854487 merged by Elukey:

[operations/puppet@production] Add a basic puppetization for Benthos

https://gerrit.wikimedia.org/r/854487

Next steps:

  • Meeting between me Joseph Andrew Filippo (and whoever wants to join!) about the sampling used in https://gerrit.wikimedia.org/r/c/operations/puppet/+/854499 (and other bits and pieces).
  • Merge of https://gerrit.wikimedia.org/r/c/operations/puppet/+/854499, that will create a new topic called webrequest_128_live in Kafka Jumbo. The centrallog nodes will start pushing enriched webrequest events to it (no new EventGate etc.. schema at this point).
  • Druid live/streaming indexation to a new datasource called webrequest_sampled_128_live (or any name that DE will prefer). The idea is not to mix this experiment with the current webrequest_sampled_128 druid datasource. The indexations will be kept for a limited amount of time, say 2/3 hours. No batch jobs will be created to override/pack/etc.. the Druid segments.
  • First milestone - People should be able to see webrequest_sampled_128_live in Turnilo.

We had the meeting today, and we agreed on few bits:

  • We'll have a dedicated druid/turnilo datasource for this use case called webrequest_sampled_live (or similar). Retention 12h/24h maximum, but completely separated from webrequest_sampled_128 (at least for the moment).
  • The sampling that Benthos could use is to module on the varnishkafka's sequence field. Nice and easy, and in the future Data Engineering could adapt their current sampling (based on Hive partitions and bucketing) more easier.
  • We'll keep the two sampled datasources in Druid/Turnilo for a while, to see differences and adjust as we need.

The https://gerrit.wikimedia.org/r/c/operations/puppet/+/854499 has been updated accordingly, and Filippo added also some basic unit tests as well. After another round of review we should be good to merge and test!

Mentioned in SAL (#wikimedia-analytics) [2022-11-15T11:50:13Z] <elukey> elukey@kafka-jumbo1001:~$ kafka topics --create --topic webrequest_sampled --partitions 3 --replication-factor 3 - T314981

Change 854499 merged by Elukey:

[operations/puppet@production] centrallog: add first prototype of webrequest-live with Benthos

https://gerrit.wikimedia.org/r/854499

Change 856948 had a related patch set uploaded (by Filippo Giunchedi; author: Filippo Giunchedi):

[operations/puppet@production] webrequest_live: add output label and limit threads

https://gerrit.wikimedia.org/r/856948

Change 856948 merged by Filippo Giunchedi:

[operations/puppet@production] webrequest_live: add output label and limit threads

https://gerrit.wikimedia.org/r/856948

Change 856949 had a related patch set uploaded (by Elukey; author: Elukey):

[analytics/refinery@master] druid: add new supervisor for webrequest_sampled

https://gerrit.wikimedia.org/r/856949

Change 856949 merged by Elukey:

[analytics/refinery@master] druid: add new supervisor for webrequest_sampled

https://gerrit.wikimedia.org/r/856949

Mentioned in SAL (#wikimedia-analytics) [2022-11-15T14:24:19Z] <elukey> started webrequest_sampled supervisor on Druid Analytics - T314981

Change 856991 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] turnilo: add webrequest_sampled_live datasource

https://gerrit.wikimedia.org/r/856991

Change 856991 merged by Elukey:

[operations/puppet@production] turnilo: add webrequest_sampled_live datasource

https://gerrit.wikimedia.org/r/856991

Change 857408 had a related patch set uploaded (by Elukey; author: Elukey):

[analytics/refinery@master] druid: add cache_status to the webrequest_sampled supervisor

https://gerrit.wikimedia.org/r/857408

Change 857408 merged by Elukey:

[analytics/refinery@master] druid: add cache_status to the webrequest_sampled supervisor

https://gerrit.wikimedia.org/r/857408

Change 857476 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] turnilo: add cache_status to webrequest_live_sampled

https://gerrit.wikimedia.org/r/857476

Change 857476 merged by Elukey:

[operations/puppet@production] turnilo: add cache_status to webrequest_live_sampled

https://gerrit.wikimedia.org/r/857476

Status:

We deployed benthos on two centrallog nodes, and we are now evaluating its performances (more info also in T319214).

Next steps:

  • Tune the Benthos configs based on the webrequest use case.
  • Think about long term strategy - Do we want Benthos on Kubernetes? If so, on what clusters? etc.. Depending on the answer we'll have to add a Helm chart etc..
  • Think about data consumed - as stated in T319214#8401535, we may want to pull data from less Kafka partitions of webrequest_{upload,text}, but are we going to get a representative sampling if we do so?
  • Think about reconciliation between webrequest_sampled_128 and webrequest_sampled_live - the former is currently populated by batch jobs, more efficient for historical queries etc.. the latter is limited to the last 24h of data, more granular segments and not batch jobs back-filling data if needed. Is it ok to keep this configuration or do we want to have everything in one datasource?

Change 858542 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] benthos: reduce webrequest-live kafka partitions to read

https://gerrit.wikimedia.org/r/858542

Change 858542 merged by Elukey:

[operations/puppet@production] benthos: reduce webrequest-live kafka partitions to read

https://gerrit.wikimedia.org/r/858542

Change 858551 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] benthos: use env() in webrequest_live's bloblang config

https://gerrit.wikimedia.org/r/858551

Change 858551 merged by Elukey:

[operations/puppet@production] benthos: use env() in webrequest_live's bloblang config

https://gerrit.wikimedia.org/r/858551

Change 858552 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/puppet@production] benthos: discard the msg in webrequest_live if ip is unset

https://gerrit.wikimedia.org/r/858552

Change 858552 merged by Elukey:

[operations/puppet@production] benthos: discard the msg in webrequest_live if ip is unset

https://gerrit.wikimedia.org/r/858552

After a few unsuccessful tries (due to me+Friday combination), me and Filippo rolled out a change to pull only from 12 kafka partitions (per webrequest topic), with sample 64. This should be the same as before (24 partitions per topic and 128 sampling), but with way less data ingested from Kafka. Varnishkafka produces webrequest msgs randomly to all partitions, so it should be a no-op change.

Note: Due to the above mess that I caused today there is a big jump in requests from 10:12->10:22 UTC, not a real one, all due to me apologies :)

Change 858561 had a related patch set uploaded (by Elukey; author: Elukey):

[analytics/refinery@master] oozie: add cache_status to webrequest's druid indexations

https://gerrit.wikimedia.org/r/858561

Change 858561 merged by Elukey:

[analytics/refinery@master] oozie: add cache_status to webrequest's druid indexations

https://gerrit.wikimedia.org/r/858561

Change 859502 had a related patch set uploaded (by Elukey; author: Elukey):

[operations/alerts@master] team-sre: add druid alerts for webrequest_sampled_live

https://gerrit.wikimedia.org/r/859502

Change 859502 merged by Elukey:

[operations/alerts@master] team-sre: add druid alerts for webrequest_sampled_live

https://gerrit.wikimedia.org/r/859502

After a chat with Filippo, we agreed that the work on this task seems done. There may be more things to do, like merging webrequest_sampled_128 and webrequest_sampled_live, but we could open a task later on down the road in case. Does it seem ok for everybody?

If anything is missing we can definitely add it, otherwise let's close.

I agree.
The only thing maybe left is to check if the segment size is the correct one. AFAIUI it currently has hourly segments but maybe due to the not too large size it could have a daily segment. Would that improve performances?

@Volans asked me, basically, how come count(distinct ip) gives slightly inaccurate results in superset -> druid queries. I didn't know, but found out that Druid by default has: useApproximateCountDistinct: true. See more at: https://support.imply.io/hc/en-us/articles/360056362993-Getting-exact-count-distinct-results-using-druid-SQL. Here's an example, and how to go about getting exact answers without tuning that setting.

This will give you exact answers, at the cost of being more expensive to compute

  with grouped_ips as (select ip from webrequest_sampled_live where __time = TIMESTAMP '2022-11-23 10:00:00' group by ip)
select count(ip),
       count(case when ip like '%:%' then ip end),
       count(case when ip not like '%:%' then ip end)
  from grouped_ips

This will give you the same answers, but I think it's just as costly to compute. Here, Druid's optimization is not applied because the optimizer I guess can't read ahead and notice all you're doing with the distinct ips is counting them.

  with distinct_ips as (select distinct ip from webrequest_sampled_live where __time = TIMESTAMP '2022-11-23 10:00:00')
select count(ip),
       count(case when ip like '%:%' then ip end),
       count(case when ip not like '%:%' then ip end)
  from distinct_ips

And, finally, this gives you the slightly inaccurate but faster results.

select count(distinct ip),
       count(distinct case when ip like '%:%' then ip end),
       count(distinct case when ip not like '%:%' then ip end)
  from webrequest_sampled_live
 where __time = TIMESTAMP '2022-11-23 10:00:00'

Thanks a lot for the deep dive and the explanation with examples @Milimetric, much appreciated!

elukey claimed this task.

Closing the task since nobody opposed to my earlier proposal :)