Page MenuHomePhabricator

[MPIC] Analyse risk of potential performance issues with static approach to stream configuration
Open, HighPublic8 Estimated Story Points

Description

Description

Based on discussion with DE(https://docs.google.com/document/d/1S2Ij3FikNGdN8ZwZKcwpAfNmGQoNtah9AwNilxqhC3k/edit), it has been decided to move forward with static stream config option as a preference.

However, there are unknowns in terms of performance issues that need to be investigated to ensure that the workflow will scale. This work is necessary irrespective of static vs dynamic approach, though they would have slightly different risk profiles.

For example, how will filtering will impact performance (especially for dashboarding) Parquet / Iceberg should be good at predicate pushdown? Does Iceberg have indexes?

Acceptance Criteria

  • Documented set of usage scenarios covering scaling of storage, requests etc
  • Set of usage scenarios, stack ranked by likelihood
  • Posit performance implications for each
  • Propose mitigation for most likely scenarios

Event Timeline

WDoranWMF created this task.

I think the metric we need here mostly is: how will fewer tables affect dashboard (superset?) latency? If all events for a given instrument(?)/schema(?) (@phuedx help me with terms please), are in one Hive/Iceberg table will dashboard loading latency be significantly worse?

It will be hard to answer this without volume estimates. How many events per hour (or second) will these streams receive? @phuedx can you make a guess on lower and upper bounds? A guess is fine. Also a guess about volumes per 'discriminating' filter/field would be helpful too.

It also depends on what the dashboarding queries are. Do dashboards generally operate by aggregating events per hour? Per day? Are there intermediate summary tables that are created via pipelines?


All that is to say: we can make some guesses but it will be hard to give a good answer without an understanding of the what the data is and how it will be used.

All that is to say: we can make some guesses but it will be hard to give a good answer without an understanding of the what the data is and how it will be used.

This.

I think that potential performance issues are a risk that we need to be aware of and should be monitoring. Can we monitor query performance on a per table/per query basis? Can we dashboard it?

We already know plenty of techniques to mitigate performance issues (partitioning, generating intermediate summary tables as mentioned above). Are there any query performance optimisations that we couldn't implement because stream configurations would only be static?

Are there any query performance optimisations that we couldn't implement because stream configurations would only be static?

I don't think so. And, if a particular 'discriminator'(?) is too high volume, even if it was in its own table, it could still cause increased dashboard latency.

I guess the question is, will having lots of events for one discriminator in one table affect dashboard latency for other discriminators?

We can probably give a guess to this question. Maybe @JAllemandou or @xcollazo who know more about Iceberg / Parquet details could help.

Question for Joseph and Xabriel: How smart are Parquet/Iceberg with simple 'predicate pushdowns' filters. If all dashboard queries use a simple WHERE filter, does having a lot of irrelevant records for that query significantly affect query performance?

Add status quo info for the usage scenarios as well as baseline performance for superset and improvement targets for it.

Question for Joseph and Xabriel: How smart are Parquet/Iceberg with simple 'predicate pushdowns' filters. If all dashboard queries use a simple WHERE filter, does having a lot of irrelevant records for that query significantly affect query performance?

I should first qualify my answer: Iceberg is not Druid. It is not designed to build dashboarding solutions on top of. Neither is Hive.

Having said that, not all dashboards need sub-second performance. And if we shape the data carefully, you can indeed get 'couple seconds' performance with Iceberg, even if its terabytes of data. Iceberg can certainly do predicate pushdowns, and moreover, we can order the writes so that we efficiently skip many files and many Parquet row groups. We use this trick on wmf_dumps.wikitext_raw, and it gave us day and night difference for that particular use case (See T340863#9397991 and on). But that is for a very specific query pattern.

If we can share a specific table schema, and some specific query examples we could assess better. We should also specify what are the latency expectations of this dashboard to better assess if Iceberg is the right tool.

Thanks @xcollazo!

I think the question at hand is, how much will the query latency of these 2 situations differ?

Situation A:

  • Tables button_experiement_a and button_experiement_b have the same schema.
  • button_experiement_a is very large, but button_experiement_b is not so large.
  • A dashboard for button_experiement_b is built.
    • Let's say a query is select day, session_id, count(*) from button_experiement_b group by session_id, day(dt) as day.

Situation B:

  • A single table button_instrument with a field experiment_id with values like "a" and "b".
  • There are many rows where experiement_id == "a", but not so many where experiement_id == "b".
  • A dashboard is built that only cares about rows where experiment_id == "b".
    • This would have a query like select day, session_id, count(*) from button_instrument where experiment_id = "b" group by session_id, day(dt) as day

How much will the presence of many irrelevant rows where experiement_id != "b" affect the query latency in Situation B? A lot? A little? I think even a guess like this could help.

I'm hoping that predicate pushdown might mean that the latency is barely affected, but I don't really know!

...does Iceberg have indexes? :p

How much will the presence of many irrelevant rows where experiement_id != "b" affect the query latency in Situation B? A lot? A little? I think even a guess like this could help.
I'm hoping that predicate pushdown might mean that the latency is barely affected, but I don't really know!

Ah, this one is simple: We partition the table by experiment_id. Both Iceberg and Hive support this.

So in fact no predicate pushdown farther than the first layer of metadata is needed.

How much will the presence of many irrelevant rows where experiement_id != "b" affect the query latency in Situation B?

Zero affect.

...does Iceberg have indexes? :p

It does not, but your use case doesn't seem to need them. IIRC from Iceberg Summit, a couple companies rolled their own indexes for Iceberg, and they where planning to spec it out for official support. But that is likely 1+ year away.

We partition the table by experiment_id. Both Iceberg and Hive support this.

Hm! This may be easier in Iceberg world than Hive, because IIUC, when writing the partitioning is handled by Iceberg based on the data values, whereas in Hive we have to explicitly tell it what the partition is, right?

Custom partitioning would be a use case for Datasets Config for sure. cc @JAllemandou.


@xcollazo for curiosity sake, what if the table were not partitioned by experiment_id? Would predicate pushdown be enough to ameliorate query latency concerns here?

whereas in Hive we have to explicitly tell it what the partition is, right?

Hive let's you do dynamic partitioning. Here is an example of us using that feature. Note I'm not advocating for Hive, I'm just saying this partitioning pattern is supported by both systems and will give you zero cost for irrelevant experiments when filtering by experiment_id.

for curiosity sake, what if the table were not partitioned by experiment_id? Would predicate pushdown be enough to ameliorate query latency concerns here?

(Assuming Iceberg now) Depends on the data landing:

  • If the INSERTs would typically only include data for a specific experiment_id, then the min/max parquet statistics will kick in and help you a lot because naturally files will only include one experiment, thus other files will be skipped.
  • If the INSERTs would typically include data for multiple experiment_ids, then we could try WRITE ORDERED BY experiment_id to skip as much as we can.

But let's partition! Is the cheapest and most effective solution.

Also, would this system also do dt filters? We could partition/ORDER BY by that as well and gain further perf.

If the INSERTs would typically include data for multiple experiment_ids

And if we don't do any special partitioning or inserting?

I ask because these tables are created by automated ingestion jobs. We are working on support for custom configuration per table, but any custom partitioning or insert logic would be manually applied at the moment.

If the INSERTs would typically include data for multiple experiment_ids

And if we don't do any special partitioning or inserting?

Then every SELECT will go over all data and will have to filter it at runtime, meaning that definitely experiment_ids with more data will slow down smaller ones.

How will these dashboards be served? Via Presto?

How will these dashboards be served? Via Presto?

Not sure, likely yes. But I believe there may be some desire to have some pipelines that get this stuff into AQS somehow. @phuedx @VirginiaPoundstone

Then every SELECT will go over all data and will have to filter it at runtime, meaning that definitely experiment_ids with more data will slow down smaller ones.

Okay, then the question to answer is: by how much? It'd be nice if we could make a guess.

@phuedx can you come up with

  • A guess for average throughput for 2 instrumentations(?), one with lots of events and one with few
  • A guess at a naive and simple query someone would run on a dashboard. (Maybe count per day? e.g. a daily button click count?)

We could then generate some artificial data and compare.

@MNeisler just caught me up on this. I just want to share some thoughts about

How will these dashboards be served? Via Presto?

I think we (not just Product Analytics but data practitioners in general) might have gotten too dependent on Presto in Superset. When Presto was introduced, it was in the Oozie / pre-Airflow times, so the ability to calculate a metric from raw event data with Presto and then easily turn the query results into a chart that can be added to a dashboard was revolutionary to our workflow. It also meant that metrics could be defined on a project-by-project basis and easily implemented & monitored.

We're trying to move away from that (teams coming up with project-specific metrics for each project), and instead pursue our strategy/vision of everyone using a shared set of essential metrics with governance and trusted datasets of their measurements. Presto was originally a boon for our workflow but is also, I think, enabling what I would call a bad habit in the long term.

Where Presto (via Superset's SQL Lab) shines is very quick analyses/answers to small questions and prototyping/iterating on metrics and dashboards. For creating trusted datasets and stable, performant dashboards that feature teams use to monitor usage of their features (including results of their experiments), we should use pre-computed essential metrics with all calculations offloaded to Airflow pipelines.

@phuedx can you come up with

  • A guess for average throughput for 2 instrumentations(?), one with lots of events and one with few

A lot? SessionTick. A few? EditAttemptStep (I'm sure there are instruments that submit fewer events if we need a better example).

Instrument NameAverage Daily Event Rate (events/day)
SessionTick117,504,000
EditAttemptStep7,361,280
  • A guess at a naive and simple query someone would run on a dashboard. (Maybe count per day? e.g. a daily button click count?)
[0]
SELECT COUNT(1) AS count FROM …;
[1]
SELECT
  action,
  COUNT(1) as count
FROM
  …
WHERE
  instrument_name = …
GROUP BY
  1
;
[2]
SELECT
  experiment.name,
  COUNT(DISTINCT(enrollment_token)) AS n_enrollments
FROM
  …
WHERE
  experiment.name = _
  AND action = 'enroll'
;
[3]
SELECT
  experiment.name,
  experiment.variant_name,
  SUM(IF(action = 'click', 1, 0) / COUNT(1) AS click_through_rate
FROM
  …
WHERE
  experiment.name = …
  AND action IN ( 'init', 'click' )
GROUP BY
  1, 2
;

All credit to @mpopov for above example queries.

we should use pre-computed essential metrics with all calculations offloaded to Airflow pipelines

wow that sounds amazing!

So, if we have pipelines anyway, then that could mitigate these concerns by splitting off the experiments/instruments/discriminator whatever into their own standardized metric tables?

As currently envisioned, a future datasets config / management system where every dataset/table is explicitly declared may make automation (e.g. a pipeline auto splitting into per experiment tables here more difficult), but I hope we can work with the datasets config design to compromise with defaults for automation. TBD.

By that time, ideally we could just use custom Iceberg partitions as Xabriel suggests.