Page MenuHomePhabricator

Adopt conventions for server receive and client/event timestamps in non analytics event schemas
Open, Needs TriagePublic

Description

In T240460#6614767 we decided the following:

  • dt is always a client AKA event timestamp.
  • meta.dt is always a server side receive timestamp.

To accomplish this:

  • All schemas should be updated to have both a meta.dt and a dt field. dt should be required.
  • EventBus should be modified to set dt to event time, but not set meta.dt (allowing EventGate to fill it in).
  • All eventgates should use meta.dt as the Kafka timestamp.
  • All gobblin ingestion jobs should use meta.dt as the partitioning timestamp

Ideally, any clients that produce directly to Kafka (not via EventGate) should use a maintained Event Platform producer library where these conventions are automatically handled (like wikimedia-event-utilities).

Downstream Kafka topics and/or Hive tables can use whatever timestamp field is appropriate. E.g. Kafka compacted topics will likely want to use event time dt as Kafka timestamp. Downstream (update-able Iceberg?) Hive tables that select from event tables will likely want to use event time dt as their own partition timestamp.

Event Timeline

fdans triaged this task as Medium priority.Nov 16 2020, 4:40 PM
fdans moved this task from Incoming to Event Platform on the Analytics board.
JArguello-WMF raised the priority of this task from Medium to Needs Triage.Jan 11 2023, 3:18 PM

Change 929733 had a related patch set uploaded (by DCausse; author: DCausse):

[schemas/event/primary@master] mediawiki/revision/score: add the dt field

https://gerrit.wikimedia.org/r/929733

Change 929735 had a related patch set uploaded (by DCausse; author: DCausse):

[machinelearning/liftwing/inference-services@main] events: propagate the event time with the dt field

https://gerrit.wikimedia.org/r/929735

Change 930665 had a related patch set uploaded (by DCausse; author: DCausse):

[machinelearning/liftwing/inference-services@main] events: drop support for /mediawiki/revision/create#1.x events

https://gerrit.wikimedia.org/r/930665

Change 930666 had a related patch set uploaded (by DCausse; author: DCausse):

[schemas/event/primary@master] mediawiki/revision/create: add mandatory dt field

https://gerrit.wikimedia.org/r/930666

Change 930668 had a related patch set uploaded (by DCausse; author: DCausse):

[mediawiki/extensions/EventBus@master] produce events compliant to /mediawiki/revision/create/2.0.0

https://gerrit.wikimedia.org/r/930668

@dcausse The description here says:

All schemas should be updated to have both a meta.dt and a dt field, with neither field required

Perhaps we should make a new common schema that does not have dt required in it, before we do this?

cc also @xcollazo

Meh, never mind, I think dt should be required. Not sure about meta.dt.

eventgate-* internal analytics hadoop ingestion (gobblin) jobs should be configured to use dt for hourly HDFS partitioning, falling back to meta.dt

if we do go for dt partitioning, falling back to meta.dt would be problematic for consumers. How do I know we falled back? Also, see drift discussed at T335860#8950708.

I think you are right. We should go for one or the other.

Change 929733 merged by jenkins-bot:

[schemas/event/primary@master] mediawiki/revision/score: add the dt field

https://gerrit.wikimedia.org/r/929733

Change 930666 merged by jenkins-bot:

[schemas/event/primary@master] mediawiki/revision/create: add mandatory dt field

https://gerrit.wikimedia.org/r/930666

Heya - sorry for chiming in late - I completely missed this task was still alive.
I want to push toward adopting meta.dt as default partitioning timestamp, and make it mandatory to every event. For schemas published directly to kafka, meta.dt could equal dt, and that wou;d make it a lot easier as a convention for downstream data usage: we use meta.dt, the platform timestamp, as partition. If someone's events have event-timestamp that are not the platform timestamp, it's on them to deal with data-lateness.
My 2 cents :)

If someone's events have event-timestamp that are not the platform timestamp, it's on them to deal with data-lateness.

Curious what you mean by this? I'm thinking of clients with clocks way in the past or way in the future. Is this point about making it explicit that consumers of the data (in the data analyst sense) need to be aware that meta.dt may not match dt – sometimes in magnitudes of hours or days – so it's up to them to figure out appropriate strategies for how they handle timestamps (and which ones) in the event data they work with?

Oh sorry, this is for non-analytics event schemas. Sorry about that! Nevermind :)

If someone's events have event-timestamp that are not the platform timestamp, it's on them to deal with data-lateness.

Curious what you mean by this? I'm thinking of clients with clocks way in the past or way in the future. Is this point about making it explicit that consumers of the data (in the data analyst sense) need to be aware that meta.dt may not match dt – sometimes in magnitudes of hours or days – so it's up to them to figure out appropriate strategies for how they handle timestamps (and which ones) in the event data they work with?

That's what I imply yes: schema semantic is to be known when querying the data if you wish not to make mistakes :)
One real use-case of the meta.dt being somehow different from dt is when event generation and event posting are (very) decoupled. If events are generated and queued while the device is offline, and then sent when the device is back online, there is a real difference between dt and meta.dt.
Building our timestamp management policy to NOT cover this type of situation makes a real difference in terms of data/pipeline management and operations.

Oh sorry, this is for non-analytics event schemas. Sorry about that! Nevermind :)

This is an interesting topic, we should talk about it (here or somewhere else :)!

For schemas published directly to kafka, meta.dt could equal dt, and that wou;d make it a lot easier as a convention for downstream data usage: we use meta.dt

The client is responsible for setting dt (event time), the system (eventgate, event producer library, etc.) is responsible for setting ingestion time (meta.dt).

I don't think we want to change the semantics of these, but enforce these semantics everywhere.

Which field is used for Hive partitioning depends on the use case I think.

Which field is used for Hive partitioning depends on the use case I think.

I would recommend sticking to meta.dt for all partitioning needs, Hive or otherwise. dt is just too unreliable in the wild for the reasons Joseph mentioned. It's not a problem at the source (mediawiki events sent, unless our servers are overloaded?), slightly more of a problem on web (just by the nature the client – the browser & device – not being something we operate), more of a problem with the sites on mobile devices (since the user could be on a page that's producing events and their device goes offline), and especially a problem with the mobile apps which have features to enable engaging with content completely offline by being able to download copies of articles.

@mpopov for analytics stuff, using meta.dt makes sense, we can't trust the producers.

However, if we were to backfill data into mediawiki_page_change_v1 for example, dt == rev_dt, and we'd be many inserting records with revisions that were created years ago (pages that have not been edited recently). I think in those cases, we'd want the edit events to be in the hourly partition of when they edit actually happened.

Change 930668 merged by jenkins-bot:

[mediawiki/extensions/EventBus@master] produce events compliant to /mediawiki/revision/create/2.0.0

https://gerrit.wikimedia.org/r/930668

Revision create events using the new /mediawiki/revision/create/2.0.0 schema appears to flow properly on the beta cluster, checked the eqiad.eventgate-main.error.validation topic but remained empty so far.

Change 932238 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/deployment-charts@master] Bump the version of eventgate-wikimedia deployed to eventgate-main

https://gerrit.wikimedia.org/r/932238

Change 932238 merged by jenkins-bot:

[operations/deployment-charts@master] Bump the version of eventgate-wikimedia deployed to eventgate-main

https://gerrit.wikimedia.org/r/932238

@mpopov for analytics stuff, using meta.dt makes sense, we can't trust the producers.

However, if we were to backfill data into mediawiki_page_change_v1 for example, dt == rev_dt, and we'd be many inserting records with revisions that were created years ago (pages that have not been edited recently). I think in those cases, we'd want the edit events to be in the hourly partition of when they edit actually happened.

For this one specifically: would we make the event generator to set meta.dt = = dt on this specific use-case? It would then indeed be late'event reconciliation, but we'd know about it and take appropriate actions, as well as not break the convention of using meta.dt for partitioning

Side note: When I say using meta.dt for partitioning everywhere, I mean everywhere for refined-events - for post-processed datasets another timestamp (even field!) could be used to partition, as long as the dataset owner understands the problems associated with such partitioning :)

would we make the event generator to set meta.dt = = dt

I don't think so? That seems a bit disingenuous; I think a purpose of meta.dt is to help understand when the event record was created/ingested, vs dt, which is the time at which the event happened.

for post-processed datasets another timestamp (even field!) could be used to partition,

+1

I'm still not convinced we shouldn't use dt for refined event partitioning when we can though. I suppose in Iceberg world it won't matter so much though?

I'm still not convinced we shouldn't use dt for refined event partitioning when we can though. I suppose in Iceberg world it won't matter so much though?

I've had fun figuring out how to efficiently ingest from a meta.dt partitioned Hive table into a dt partitioned Iceberg table (See T335860#8950708 and our Slack thread on consuming page content change events), but I think I now have a reasonable solution that will be performant. The TLDR is that a MERGE INTO needs partition pushdown predicates for the target table to perform well, and if partitioning columns differ between source and table, then you have to calculate target partition bounds before your MERGE INTO. I'll explain this better over at T335860 once my tests pass.

I'm still not convinced we shouldn't use dt for refined event partitioning when we can though. I suppose in Iceberg world it won't matter so much though?

The main reason for me is to keep it simple for users querying the data: instead of having to know which timestamp is use for partitioning depending on datasets, it's the same everywhere.

The ideal thing to do would be to use dt event time everywhere. We just can't for producers of analytics data because we can't trust their event time.

Perhaps the event database needs split into different databases too? Might be a little easier to do this after Refine is on Airflow.

The ideal thing to do would be to use dt event time everywhere.

For data partitioning it would have drawbacks, which we discussed a bit on Slack, but I copy here for completeness:

From @Milimetric:

that wouldn't work in the refine / event / event_sanitized flow because anything older than 90 days would be deleted immediately. I suspect these old events are page restores, and we definitely want them to stick around. It's ok that they're partitioned by ingestion time instead of event time, but to be clear, dt / revision.rev_dt is the event time, right?

From me:

you’d expect an event from 2017 to be in the 2017 partitioned dir

given that this is a changelog, I would expect the event to be on the partition that it was received.
otherwise we risk consumers of the changelog having to walk it from the beginning to be able to capture late events
in other words: I want to be able to set an Airflow sensor on rc1_mediawiki_page_content_change and consume the latest partition, and that partition may include late events, and that is ok because I will make sure that my consumption logic knows that dt is the event time

From @Ottomata :

hmm interesting. that may indeed make sense…maybe the event tables are more analagous to the stream, and even though the events do have a correct event time, their proper ordering is in receive time (just like they are in the kafka log)?

From me:

even though the events do have a correct event time, their proper ordering is in receive time

That is what makes sense to me because: let’s assume that we instead partitioned rc1_mediawiki_page_content_change by dt, then on my Airflow sensor example above, how would I consume this table? I would have to walk it from the beginning every time because a previously consumed partition may have changed.
( Side note: this could be avoided if we made rc1_mediawiki_page_content_change an Iceberg table and instead of time partitions, we consumed snapshots, but then we get into the business of teaching Gobblin how to write Iceberg…)

Thank you!

( Side note: this could be avoided if we made rc1_mediawiki_page_content_change an Iceberg table and instead of time partitions, we consumed snapshots, but then we get into the business of teaching Gobblin how to write Iceberg…)

Gobblin doesn't necessarily need to know this, just Refine...if we made Refine repartition based on dt.

Perhaps...the raw gobblin directories are partitioned by meta.dt, and Refine discovers work to do based on this partitioning, but in the magical Iceberg dream world, Refine will write into the event table 'partition' (?does this even make sense? not really?) based on dt?

The ideal thing to do would be to use dt event time everywhere.

While I agree it's the "natural" timestamp for some (many?) events, I fear time-semantic unforeseen problems. Would we make event be partitioned by meta.dt and another dataset layer, where time-issues are explicitly handled, use dt?

Change 929735 merged by jenkins-bot:

[machinelearning/liftwing/inference-services@main] events: propagate the event time with the dt field

https://gerrit.wikimedia.org/r/929735

Meeting today, discussed / decided the following:

  • We do want to eventually use Iceberg Hive event tables.
  • meta.dt should most likely be used for 'ingestion' partitioning and kafka timestamps on 'raw'-ish 'log' tables. This means we should use meta.dt for Kafka timestamps and Hive table partitioning, unless:
    • If the stream or table is meant for event time based querying, and it is expected that late data will be accounted for, then we should use dt. E.g. kafka compacted topics, or downstream / computed event tables.

We should still make all event streams respect the dt/meta.dt convention. We need to do this for older MediaWiki EventBus based streams. Unfortunately, we will not be able to do this for 'legacy' eventlogging events.. meta.dt will be correct, but dt will always be processing time for these (meta.dt == dt in this case).

I will adjust the task description to account for the decision of using meta.dt for timestamp and partitioning. The good news is that this decision is the status quo! meta.dt is used for the kafka timestamp by eventgate-wikimedia, and meta.dt is used everywhere for gobblin partitioning (which is what is used by Refine too).

We also discussed request tracing between events. @JAllemandou noted that likely the page_content_change stream should have a new generated meta.id field. Filed T341277: mediawiki page_content_change should generate new meta.id field for that.