Page MenuHomePhabricator

mw-page-content-change-enrich should partition by and process by wiki_id,page_id
Closed, ResolvedPublic

Description

The mediawiki.page_change.v1 stream is produced by eventgate to Kafka with a JSON blob key that includes page_id and wiki_id.

The mediawiki.page_content_change enrich job currently partitions the Flink DataStream by just wiki_id. We should change this partitioning to use wiki_id and page_id (via a hash) (to ensure events are processed in page order).

Done is
  • mw-page-content-change-enrich partitions by and processes events in wiki_id,page_id order

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Ottomata renamed this task from mw-page-content-change-enrich should ensure ordering by wiki_id,page_id, and (re)produce kafka keys to mw-page-content-change-enrich should partition by and process by wiki_id,page_id.Jun 6 2023, 1:26 PM
Ottomata updated the task description. (Show Details)
Ottomata triaged this task as Medium priority.Jun 28 2023, 3:21 PM
Ottomata moved this task from Backlog to Estimated/ Discussed on the Event-Platform board.

We should change this partitioning to use wiki_id and page_id (via a hash)

Not sure if we need hashing here, at least till we run jobs with parallelism=1.

Thinking out loud: with the current windowing mechanism, that combines a count trigger with a time based one, I don't think we can simply hash as (wiki_id, page_id) % window_size, because window size is not constant.

Accodring to DESIGN.html you use a custom KeyedProcessFunction so the key must comply with the batch requirements and make sure that the buffers are big-enough, so anything that has the page_id in the key must be reduced to make the batch size effective.

(wiki_id, page_id) alone is probably not good and to tune a modulo value in the key function like (wiki_id, page_id) % modulo you would have to stick to a constant value that is small enough to have big enough batches but high enough that it accounts for any parallelism increase you might want to apply in the future. This modulo becomes also the de-facto parameter to limit concurrency as it becomes the max parallelism you could set, any parallelism above it would be meaning-less.
So (wiki_id, page_id) % 10 would probably be better in terms of batching than (wiki) alone but imposes a hard limit on parallelism <= 10.

In the end the main drawback is that this modulo value must never change unless you drain the pipeline before changing it (the keyed state cannot be redistributed by flink automatically if the key function changes).

The way around it might be to use a custom ProcessFunction that uses its operator state to buffer everything that it sees regardless of the key but apparently the operator state cannot be used from the python API so probably not an option for you.

But in the end and looking at the done criteria: keying by wiki_id already provides a well-ordered stream of events by wiki_id,page_id so unless there are other reasons (perf tuning?) perhaps this is not required?

Thanks for helping validate direction @dcausse! My plan of attack for now is to drop the hashing requirement (see below), and measure perf of a (wiki_id, page_id) partitioning scheme on the rc1 stream.

Accodring to DESIGN.html you use a custom KeyedProcessFunction so the key must comply with the batch requirements and make sure that the buffers are big-enough, so anything that has the page_id in the key must be reduced to make the batch size effective.

(wiki_id, page_id) alone is probably not good and to tune a modulo value in the key function like (wiki_id, page_id) % modulo you would have to stick to a constant value that is small enough to have big enough batches but high enough that it accounts for any parallelism increase you might want to apply in the future.

Ack. I think our upper bound would be the count window threshold (20 events), but the lower bound is ill defined and prone to skewness in data. I'd rather avoid going down this route for no obvious perf gains.

In the end the main drawback is that this modulo value must never change unless you drain the pipeline before changing it (the keyed state cannot be redistributed by flink automatically if the key function changes).

Good point!

The way around it might be to use a custom ProcessFunction that uses its operator state to buffer everything that it sees regardless of the key but apparently the operator state cannot be used from the python API so probably not an option for you.

IIRC I looked into operator state a while ago, and it was not supported by the Python API. Still isn't as of 1.17. From the link you shared: [...] Notes: Operator state is still not supported in Python DataStream API. [...].

But in the end and looking at the done criteria: keying by wiki_id already provides a well-ordered stream of events by wiki_id,page_id so unless there are other reasons (perf tuning?) perhaps this is not required?

The only reason would be to have a partitioning scheme that matches what eventgate produces to Kafka. Performance (throughput/latency/system load) is not an issue right now, but I'd need to test for regressions if partitioning by (wiki_id, page_id). I _think_ we should be fine (based on experiments on yarn), but I still need to run on k8s and monitor grafana.

Change 935153 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] mw-page-content-change-enrichment stream partition WIP

https://gerrit.wikimedia.org/r/935153

In the end the main drawback is that this modulo value must never change unless you drain the pipeline before changing it (the keyed state cannot be redistributed by flink automatically if the key function changes).

Hm, I hadn't considered this to be a problmem in our case, so maybe I'm missing something. To make a change in parallelism, we'd likely restart the job, and likely do so from a savepoint? IIUC, the only state that is kept in our checkpoints and savepoints is the kafka offset. Would we not be able to change the modulo in config and restart the job with different parallelism?

In the end the main drawback is that this modulo value must never change unless you drain the pipeline before changing it (the keyed state cannot be redistributed by flink automatically if the key function changes).

Hm, I hadn't considered this to be a problmem in our case, so maybe I'm missing something. To make a change in parallelism, we'd likely restart the job, and likely do so from a savepoint? IIUC, the only state that is kept in our checkpoints and savepoints is the kafka offset. Would we not be able to change the modulo in config and restart the job with different parallelism?

You should have the pending batches in your state that are attached to this key but while I think about it might not hurt, worst is that on resuming you won't append to the same batch and possibly start emitting out of order events, but the way flink works IIRC is that it should trigger the processing time timers and it's very likely that existing batches are going to be fired early.
So yes changing the modulo of the key selector might not be as terrible as I thought initially as it might drain itself when resuming.

Regarding not using a modulo, I guess that you'll have to be cautious about not creating too many windows and have 99.99% of your windows holding a single event.

In the end the main drawback is that this modulo value must never change unless you drain the pipeline before changing it (the keyed state cannot be redistributed by flink automatically if the key function changes).

Hm, I hadn't considered this to be a problmem in our case, so maybe I'm missing something. To make a change in parallelism, we'd likely restart the job, and likely do so from a savepoint? IIUC, the only state that is kept in our checkpoints and savepoints is the kafka offset. Would we not be able to change the modulo in config and restart the job with different parallelism?

You should have the pending batches in your state that are attached to this key

Sorry about that, this is apparently not true, current batch is attached to the operator itself as a class member and not seen by flink. Timers should be stored on a per key basis tho, but still unsure how this plays together, a non-persisted/non-keyed state held in batch_futures and keyed timers (probably the timer does not get cleaned up as expected when the batch is completed and you'll get few spurious timers being fired?).

Change 935153 merged by jenkins-bot:

[operations/deployment-charts@master] mw-page-content-change-enrichment stream partition WIP

https://gerrit.wikimedia.org/r/935153

Change 936718 had a related patch set uploaded (by Gmodena; author: Gmodena):

[operations/deployment-charts@master] mw-page-content-change-enrichment partition by (wiki_id, page_id)

https://gerrit.wikimedia.org/r/936718

Change 936718 merged by jenkins-bot:

[operations/deployment-charts@master] mw-page-content-change-enrichment partition by (wiki_id, page_id)

https://gerrit.wikimedia.org/r/936718

Change 937145 had a related patch set uploaded (by TChin; author: TChin):

[operations/deployment-charts@master] mw-page-content-change-enrich bump docker version

https://gerrit.wikimedia.org/r/937145

Change 937145 merged by jenkins-bot:

[operations/deployment-charts@master] mw-page-content-change-enrich bump docker version

https://gerrit.wikimedia.org/r/937145

@gmodena I see we went without a hash modulo here. Are we at risk of what David said:

Regarding not using a modulo, I guess that you'll have to be cautious about not creating too many windows and have 99.99% of your windows holding a single event.

?