Page MenuHomePhabricator

Consistent MediaWiki state change events | MediaWiki events as source of truth
Open, MediumPublic

Description

T116786 introduced MediaWiki Event-Platform production via an extension utilizing hooks. While adequate for the EventBus MVP, this is only an interim solution. Ultimately, we need a mechanism that guarantees event delivery (eventual consistency is OK).

The Event Platform program extended the work started in T116786 to provide a standardized event producing APIs unified for both production and analytics purposes.

However, in order to build truly reliable new production services with events based on MediaWiki data, we need a single source of truth for MediaWiki data. That source of truth is the MediaWiki MySQL database. This is only consistently accessible by MediaWiki itself. There is currently no way to consistently expose (real time) MediaWiki state changes to non MediaWiki applications.

We do have events produced by MediaWiki, but these events are decoupled from the MySQL writes, and there is no guarantee that e.g. every revision table save results in a mediawiki.revision-create event. This means that as of today, MediaWiki events cannot be relied on as a 'source of truth' for MediaWiki data. They are not much more than a best (really good!) effort notification.

Background reading: Turning the database inside out

Why do we need this?

I asked a few stake holders to explain why this is important to them, and they gave me permission to quote them here. These are a few examples of why consistent events are important.

WikiData Query Service Updater - T244590

@Zbyszko:

... missed events are probably the biggest issue in the system. We have visibility into late and out of order events (and probably mostly buggy events, but there's no way of knowing for sure). Not only that, there are sensible ways of dealing with them, both in general and in our specific situation.

Missed events are, by their nature, invisible to us via standard means and hard to observe in general. Since we also don't really understand the situation when those are dropped, it's hard to assess the impact on WDQS updater. We decided we're ok with it for now, because it's simply still better than the previous solution.

To reiterate - we can deal with lateness and out-of-orderliness - dealing with missed events is order of magnitude a harder challenge.

Image Recommendations project - T254768

@gmodena:

Throughout the month, the state of an article can change. We'll need to track a "revisions events topic" to establish a feedback loop with the
model re the following state changes (among others):

  1. Previously unillustrated articles that are now illustrated
  2. Articles illustrated algorithmically, that have been reverted
  3. Orthogonal (technically not a MW state change): track which recommendations have been rejected by a client.

Being late in capturing state changes, would result in a degraded UX that will fix itself with time.
Missing events would be an order of magnitude harder problem to solve.

HTML wiki content dumps and other public datasets - T182351

@fkaelin:

Another category of tools that depend on the correctness of the events are derived datasets that the foundation could publish. This includes the equivalent of the wikidumps on which the analytics wiki history datasets are based, which could be replaced with a snapshot-less and continuous log of revisions. Another example is the html dumps discussed in T182351: Make HTML dumps available, which the OKAPI team can also relate to, and any number of other datasets that one can think of.

Wikimedia Enterprise AKA Okapi

@Protsack.stephan

if you don't have consistent events, how else would you get the data you need for your use case? - We heavily rely on events to maintain our dataset. Basically we do CDC from event streams to maintain our dataset. Not having consistent events means that our dataset gets out of sync and we need to engineer something on top of events to make sure that it is consistent. Just FYI we are just acknowledging that events may be not consistent and putting that problem into a box for now, but that's probably going to be our next bridge to cross.

Potential solutions

Event Sourcing is an approach that event driven architectures use to ensure they have a single consistent source of truth that can be used to build many downstream applications. If we were building an application from scratch, this might be a great way to start. However, MediaWiki + MySQL already exist as our source of truth, and migrating it to an Event Sourced architecture all at once is intractable.

In lieu of completely re-architecting MediaWiki's data source, there are a few possible approaches to solving this problem in a more incremental way.


Change Data Capture (CDC)

CDC uses the MySQL replication binlog to produce state change events. This is the same source of data used to keep the read MySQL replicas up to date.

Description
A binlog reader such as debezium would produce database change events to Kafka. This reader may be able to transform the database change events into a more useful data model (e.g. mediawiki/revision/create), or transformation maybe done later by a Stream Processing framework such as Flink or Kafka Streams.

Pros

  • No MediaWiki code changes needed
  • Events are guaranteed to be produced for every database state change
  • May be possible to guarantee each event is produced exactly once
  • Would allow us to incrementally Event Source MediaWiki (if we wanted to)

Cons

  • Events are emitted (by default?) in a low level database change model, instead of a higher level domain model, and need to be joined together and transformed by something, most likely a stateful stream processing application.
  • WMF's MariaDB replication configuration may not support this (we may need GTIDs).
  • Data Persistence is not excited about maintaining more 'unicorn' replication setups.

Transactional Outbox

This makes use of database transactions and a separate poller process to produce events.

See also: https://microservices.io/patterns/data/transactional-outbox.html

Description
Here's how this might work with the revision table:

When a revision is to be inserted into the MySQL revision table, a MySQL transaction is started. A record is inserted into both the revision table and the revision_event_log table. The MySQL transaction is committed. Since this is done in a transaction, we can be sure that both of the table writes happen atomically. The revision event is produced to Kafka. When the Kafka produce request succeeds, the revision_event_log's produced_at timestamp (or boolean) field is set.

A separate process polls the revision_event_log table for records where produced_at is NULL, produces them to Kafka, and sets produced_at when the produce request succeeds.

If needed, revision_event_log records may be removed after they are successfully produced.

NOTE: This example is just one of various ways a Transactional Outbox might be implemented. The core idea is the use of MySQL transactions and a separate poller to ensure that all events are produced.

Pros

  • Events can be emitted modeled as we choose
  • Since MW generally wraps all DB writes in a transaction, no MW core change needed. This could be done in an extension.

Cons

  • At least once guarantee for events, but this should be fine. There may be ways to easily detect a the duplicate event.
  • Separate polling process to run and manage.

Hybrid: Change Data Capture via Transactional Outbox

This is a hybrid of the above two approaches. The main difference is instead of using CDC to emit change events on all MySQL tables, we only emit change events for event outbox tables.

This idea is from Debezium: https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

Description
MediaWiki would be configured to write all changes in a transaction with the outbox tables. When a revision is to be inserted into the revision table, a MySQL transaction is started. A record is inserted into the revision table as well as the revision_event_outbox table. The revision_event_outbox has a field including a JSON string representing the payload of the change event. The transaction is then committed.

A binlog reader such as Debezium would then filter for changes to the revsion_event_outbox table (likely extracting only the JSON event payload) and emit only those to Kafka.

Pros

  • Events can be emitted modeled as we choose
  • Events are guaranteed to be produced for every database state change
  • May be possible to guarantee each event is produced exactly once
  • No need to transform from low level database changes to high level domain models.
  • Since MW generally wraps all DB writes in a transaction, no MW core change needed. This could be done in an extension.
  • Would allow us to incrementally Event Source MediaWiki (if we wanted to)

Cons

  • WMF's MariaDB replication configuration may not support this (we may need GTIDs).
  • Data Persistence is not excited about maintaining more 'unicorn' replication setups.

2 Phase Commit with Kafka Transactions

This may or may not be possible and requires more research if we want to consider it. Implementing it would likely be difficult and error prone, and could have an adverse affect on MediaWiki performance. If we do need Kafka Transactions, this might be impossible anyway, unless a good PHP Kafka Client is written.

Related Objects

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

We should focus on reconciliation strategies instead of chasing for panaceas for problems we know cannot be solved reliably.

Can you explain why the binlog solution is not solving the problem reliably?

Where is debezium supposed to run?

In k8s.

We should keep primary masters as clean as possible

Agree, I'd prefer to consume the binlog of a replica.

That's also something not easy for several reasons:

  • It would need to run on all replicas, otherwise we'd have a SPOF and a snowflake. We already have too many snowflakes with some of the replicas (candidates masters, sanitarium masters, vslow...)
  • Some of the replicas have already quite a bunch of load and adding more reads latency isn't great (ie: enwiki)

What happens with the PII that gets written to binlogs?
So really, we don't even need the full binlog, just the logs for our outbox table(s). (I think MySQL has a binlog table filter setting, right?)

Not sure I am following - you mean for reading the binlog or for only replicating specific tables?
For reading, as far as I remember you only have a database filter.
For replicating, that means changing replication filters in production, which is, unfortunately a no-go as we cannot only replicate certain tables to our replicas. Everything needs to be replicated.

Why not using this on clouddb1021

This could work. However, this would be in the critical path for production services like jobqueue and resource_purge.

But on the other hand it is a separated environment that if there are any performance issues, it won't hit production.

We should focus on reconciliation strategies instead of chasing for panaceas for problems we know cannot be solved reliably.

Can you explain why the binlog solution is not solving the problem reliably?

Because you will still have software failures, network partitions, error handling: this is basically the CAP theorem in action.

Because you will still have software failures, network partitions, error handling: this is basically the CAP theorem in action.

This is then true of the MySQL replicas then too, right?

IIUC, the binlog has offsets. Debezium acts like a regular MySQL replica reading the binlog, keeping track of where it is in the offsets. If it fails, it should work just like a MySQL replica would: start from wherever it left off.

I'm not claiming this would achieve full consistency always, just that we should be able to know when something is going wrong. At the moment we have no way of knowing if e.g. every revision save results in a revision-create event.

Because you will still have software failures, network partitions, error handling: this is basically the CAP theorem in action.

This is then true of the MySQL replicas then too, right?

IIUC, the binlog has offsets. Debezium acts like a regular MySQL replica reading the binlog, keeping track of where it is in the offsets. If it fails, it should work just like a MySQL replica would: start from wherever it left off.

Assuming we have discarded the idea of replicate/touch the master:
How would it handle if the replica goes down for a long time or even forever (ie HW crash)?

Right now the way we handle that is...we just rebuilt it from either the backups/another replica which are in 100% of the cases in a different binlog/position. This can be mitigated if Debezium uses GTID, but in any case, this would be a problem.
And as I mentioned above, it can be such an overhead to have another replica that is a snowflake cause it would have another replica hanging below it.

Agree, I'd prefer to consume the binlog of a replica.

Why not using this on clouddb1021

Aye, a clouddb1021 type setup makes a lot of sense for this. Perhaps we could start with clouddb1021, and if we want something that is more 'production' we could set up another MySQL host with the same type of multi-instance replica setup.

It would need to run on all replicas, otherwise we'd have a SPOF and a snowflake.
...we just rebuilt it from either the backups/another replica which are in 100% of the cases in a different binlog/position

Ah I see good questions. Needs more research I think, but there is GTID support in Debezium. https://debezium.io/documentation/reference/connectors/mysql.html But I think I need to understand more about how we usually do replication setups. I'm not sure I understand the 'snowflake' problem.

I had a quick look, as mariadb & mysql's GTID implementations are different and incompatible. Their roadmap says that they won't look at what's required to support mariadb until 1.6: https://debezium.io/roadmap/. (The current stable version of Debezium is 1.4; 1.5 is the current development branch).

Their roadmap says that they won't look at what's required to support mariadb until 1.6

Ah interesting ok.

Even if it were available, how would this work? Is GTIDs something we'd have to specifically enable on all MariaDB hosts? If we did that, does that just make it possible for any replica to swap out which master it is replicating from (assuming that 'master' has all the relevant binlogs)? (/me is learning more about GTID replication...)

I don't think so; our mariadb clusters would count as a "Primary and replica" setup, not "High available clusters" (based on supported-mysql-topologies).

Agree, I'd prefer to consume the binlog of a replica.

Why not using this on clouddb1021

Aye, a clouddb1021 type setup makes a lot of sense for this. Perhaps we could start with clouddb1021, and if we want something that is more 'production' we could set up another MySQL host with the same type of multi-instance replica setup.

Let's keep that in mind for budgeting for next fiscal.
However, having that host connected to production master is something that is operationally an overhead, as we'd need to include it under our failover strategies, as otherwise if we failover the master, that host will keep replicating from the old one.
If it replicates from clouddb1021 that's already covered by our upstream master (the sanitarium host).

It would need to run on all replicas, otherwise we'd have a SPOF and a snowflake.
...we just rebuilt it from either the backups/another replica which are in 100% of the cases in a different binlog/position

Ah I see good questions. Needs more research I think, but there is GTID support in Debezium. https://debezium.io/documentation/reference/connectors/mysql.html But I think I need to understand more about how we usually do replication setups. I'm not sure I understand the 'snowflake' problem.

The snowflake problem is essentially that right now we have different types of hosts in production, they all run as replicas but have specific roles:

  • Candidate master (host that will take over in case of a master failure)
  • Sanitarium master (host that has sanitarium host as a replica and then all the clouddb1021 hosts and runs ROW based replication)
  • vslow,dump hosts: host that receive dump traffic, which sometimes cause load/issues.

All these create lots of operational burden on us, as we cannot simply exchange hosts if needed. We are in process of simplifying things to ease our operational load.

If we can have this specific connector hanging from clouddb1021, that already fixes helps us, as we'd need to worry about it, in terms of master failovers as there's likely nothing going to replace clouddb1021 :-)

Not super relevant as the problem stands still, if we need to include this host in production then we do need to deal with possible master failures or swap.
None of our tools would be able to deal with it, and hence we'd need to manually change replication whenever there's a swap or a failure.
If it is connected to clouddb1021, then that's automatically handled by the upstream master, so any movement would be transparent to this host.
Obviously, if clouddb1021 crashes and it is not recoverable, then we do need to rebuild or think about how to handle it. A normal crash of clouddb1021 shouldn't be a big deal as replication will restart once it is back and will catch up once cloudb1021 has caught up with its upstream master.

Interesting, makes sense.

A LooONng time ago when I did MySQL DBA work, to restore a replica, I would use LVM snapshots via mylvmbackup. I'd have a dedicated replica on which I could lock tables, take a snapshot, and then unlock tables. A background process would then mount the LVM snapshot and rsync the MySQL data directory elsewhere for backup or restore. With this process, I believe the binlog offsets would be consistent between the original host and the restored replica. I know this is different than how we do things, but in theory, would this work? Could we restore a crashed replica and then point Debezium (or any other MySQL replication consumer) at it, starting from the same binlog positions from the original?

Ah, if we needed to change the binlog position information used by Debezium, this is how: https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database

We are in process of simplifying things to ease our operational load.

I'm interested in hearing more about this. It sounds like the problems with using Debezium are similar to the problems of running MariaDB replicas! :) Is part of this simplification effort to make it easier to change a replica's master?

From reading through their docs a bit:

  • Debezium requires binlog_format=ROW, which means it cannot connect directly to any of our masters (as we use STATEMENT).
  • It also grabs a global read lock on the server it connects to when making an initial snapshot which would also be a huge no-no against any pooled db instance. It's unspecified if it repeats this when making non-initial snapshots.

It also grabs a global read lock on the server it connects to when making an initial snapshot which would also be a huge no-no against any pooled db instance. It's unspecified if it repeats this when making non-initial snapshots.

I don't know if this is configurable, but if it is, I don't think we need it for the Transactional Outbox idea. In any case, I wouldn't want to run Debezium against a production db for sure.

Debezium requires binlog_format=ROW, which means it cannot connect directly to any of our masters (as we use STATEMENT).

Could a dedicated replica for debezium (or on cloudb1021) turn on binlog using ROW format?

@Ottomata I am going to interject here, as backups owner.

I have 2 needs regarding mediawiki databases and backups that I would like to solve, too. One is incremental backups for mediawiki dbs ("binlog backups"/streaming backups), the other is incremental media backups (which requires a reliable way to get image updates).

I would like, @Ottomata, if we could colaborate together towards a solution that works beyond analytics needs, so it could be useful also for backups and other teams/needs.

Many changes happen on the databases that do not touch the binlogs, as well as constant master switches, so not sure that is the right approach (schema changes, data consistency fixes, etc.). I also need (for backups) to do some consistency checks that may impact analytics data, but that is out of scope here.

+1 @jcrespo this ticket is about solving the MW event production consistency problem so we can use them as a 'source of truth' for things outside of MW/MySQL. But the backups and dumps problem is similar, as are other issues we will encounter will building event driven applications, e.g. how to bootstrap them with current state? We should think of this as an org wide data integration problem and work on a common solution where we can :)

Ok I'll try to re-summarize my argument:
the problem we're trying to solve is having transactional consistency between mediawiki and kafka. And we want to do it not at the application layer, but at the data layer, which is what I think is wrong for a few reasons. But before we go back to discussing solutions, I'd like to see a better explanation of the problem.

The problem statement I read here is:

"We do have events produced by MediaWiki, but these events are decoupled from the MySQL writes, and there is no guarantee that e.g. every revision table save results in a mediawiki.revision-create event."

which is a common reliability and logical consistency problem, but no analysis of the causes of such inconsistencies is provided. Once we have those spelled out, we can think of different approaches which do not involve using the database (and its binlog) as a message queue, and as an integration layer.

no analysis of the causes of such inconsistencies is provided.

Hm, I guess none is provided mostly because no consistency is expected with the current async logic (An HTTP POST from a MW deferred update after the MySQL transaction is finished).

I'd like to see a better explanation of the problem.

MySQL is the source of truth for MediaWiki data. We'd like to be able to rely on events as a source of truth instead of MW/MySQL, so that we can make doing things like T244590: [Epic] Rework the WDQS updater as an event driven application easier (and possible). We want realtime-ish MW state changes without having to poll MySQL or the MW API for every new use case. By putting those state changes as events in Kafka, we can build new services that consume the changes without being runtime coupled to MW. However, if the MW events are not consistent, then we can't rely on them as a source of truth.

If the binlog reader is the problem here, we could of course just implement some custom MySQL poller process. This is the simple 'Transactional Outbox' solution. Ultimately this is how Kafka Connect JDBC works too, but I don't think we want to (or can?) use that for the same reasons we don't really want the entire MediaWiki binlog as events (too low level).

I'd like to see a better explanation of the problem.

MySQL is the source of truth for MediaWiki data.

That's a bit of oversimplification, MediaWiki does use for example s1 for enwiki but it's way more complex than that. For example, global users data comes from centralauth in s7, url shortener and some other mediawiki functionalities come from x1 (completely different hardware), texts of pages come for ES (yet another beast) and I assume we are not taking into account caching mediawiki has, like ParserCache, ...

Having mediawiki as an abstraction between analytics and databases is a good idea that ditching would cause all sorts of nastiness and edge cases you get handled for free in mw and the databases configuration and topology changes for reasons. It's like bypassing transport layer in OSI. If TCP is too slow, use UDP. Build something in mediawiki that's fast and quick instead of bypassing mediawiki altogether. You can take advantage of some caching even (depending on your usecase)

Debezium requires binlog_format=ROW, which means it cannot connect directly to any of our masters (as we use STATEMENT).

Could a dedicated replica for debezium (or on cloudb1021) turn on binlog using ROW format?

I would still go for clouddb1021, the concept of "dedicated" replica for this introduces yet another specific replica for a specific role, and we already have many of those (T120242#6914105). clouddb1021 is owned by Analytics so we can set up ROW there if that's.

It also grabs a global read lock on the server it connects to when making an initial snapshot which would also be a huge no-no against any pooled db instance. It's unspecified if it repeats this when making non-initial snapshots.

I don't know if this is configurable, but if it is, I don't think we need it for the Transactional Outbox idea. In any case, I wouldn't want to run Debezium against a production db for sure.

If in the end, debezium is the solution you want to go for, I would prefer not to place another replica just for this for the reasons given above (keep in mind it would need to be a replica per section or a big host with multi-instance). If we can get it hang from clouddb1021 that it already has all the sections, that's better.

Hm, @Ladsgroup I'm certainly not suggesting that we should ever bypass MediaWiki and get at MySQL at runtime. What I mean is that if MediaWiki was totally shut down, and all 'caches' (including elasticsearch) were wiped, the 'source of truth' to turn everything back on would come from MySQL (+ media / wikitext content storage).

Other services need consistent copies, just like caches do. Having slightly inconsistent caches isn't such a big deal, but having e.g. inconsistent WDQS or incrementally updated MediaWiki history is a bit more of an issue. This task is about making it possible to use events consistently and in realtime to carry state transfer from MediaWiki elsewhere.

If MediaWiki could somehow offer transactional guarantees for events it emits, this wouldn't be an issue. Until MW's MySQL transaction is closed, we don't consider MW state real. By sourcing the events out of MySQL, we can be more confident about their consistency with MediaWiki's reality.

clouddb1021 is owned by Analytics so we can set up ROW there if that's

Cool sounds good, especially at first. However, whatever the solution of this task is will likely be co-maintained by data engineering and platform engineering. Many services (WDQS updater, job queue, change prop, analytics event ingestion, etc.) will rely on it, just like they rely on EventBus generated events right now. In the future, I expect more and more event driven services to be built, all of which will mostly rely on events for state transfer, and many of them will want MediaWiki state.

clouddb1021 is owned by Analytics so we can set up ROW there if that's

Many services (WDQS updater, job queue, change prop, analytics event ingestion, etc.) will rely on it, just like they rely on EventBus generated events right now. In the future, I expect more and more event driven services to be built, all of which will mostly rely on events for state transfer, and many of them will want MediaWiki state.

The solutions proposed here are not ok for me, and others have expressed disagreement. I think they will need further discussion (in the tech forum? with the interested/impacted teams?) before implementation. I think this is a very dangerous path, and going against all best practices I can think of.

With this, I am NOT discarding the original problem statement ("we have several dependent datastores that can be derived from MediaWiki, let's find a reasonable way to jumpstart and update them reliably"). I would suggest a task describing the actual problem we want to solve is opened, instead of using this quite stale task which lists solutions first.

I think they will need further discussion (in the tech forum? with the interested/impacted teams?) before implementation

Agree. I've asked Kate if she thinks this is tech forum worthy and she said that it probably was.

I would suggest a task describing the actual problem we want to solve is opened, instead of using this quite stale task which lists solutions first.

Am confused about this. I rewrote this task description within the past year to refocus the problem. I think the task states the problem pretty clearly first, with possible solutions following. Any other ideas would be very welcomed! I think it's ok to discuss possible technical solutions before officially proposing them for implementation.

If by 'describing the actual problem' you mean more concrete use cases, I get your point. I'll try to get together a concrete list of services that currently do rely on MediaWiki events, as well as ones in the near future that are likely to.

going against all best practices I can think of

@Joe I'd be interesting hearing more about what best practices you are thinking of. As for the possible solutions this task is exploring, I'm not making them up, but am gathering them from readings of 'best practices' out there. In particular, I'd like to hear what is bad about these ideas. Since we're talking about best practices, perhaps instead of focusing on these possible solutions, you could help me understand what is wrong with the ideas I am reading? I linked the this Turning the database inside-out Strange Loop talk and blog post in the task description, and I think its relevant here (minus the specifics about Samza). I know I'm very steeped in event-driven world hype, but I think I can see event driven architectures as a way out of reimplementing the same one-off data pipelines over and over again. But of course my real world experience with building them is very limited, and I'm very open to being wrong.

The best practices I am talking about are, basically:

  • Don't use the database as an integration layer. As @Ladsgroup clearly pointed out, it turns ugly very fast once you consider real-life situations (like - an event likely needs to source from multiple MediaWiki datastores at once), and the advantage of "consistency" evaporates quickly
  • Don't use the database as a queue. The proposal here is to insert events in the database (so making them part of the master db transaction) instead of producing them to a queue.

What might not be clear from my opposition is that I am in favour of having an event-driven propagation of changes - we do so already, and I think doing it better is a plus. I just think this specific proposal is flawed.

IMHO, it would be better to take a different approach, for instance (forgive the brevity, but I'm just trying to figure this out at 9 pm :)):

  • Improve the reliability and consistency of how events are produced in MediaWiki
  • Improve the reliability of the eventgate/kafka conglomerate we use to ingest events from MediaWiki
  • Devise a way to be able to link an event to the database transaction/revid/whatever that generated it, and have a system that allows reconciliation. For instance: one revision is missing from the event log but is present in the database? then generate the event that we're missing.

Also, given one of our goals is to allow fast-loading data in new instances of dependent services:

  • Generate a "daily events snapshot" *from MediaWiki that allows fast-loading all of our data in a new dependent datastore by consuming that topic, then be able to consume the main events topic from the offset at which the snapshot was created

The reason for this last point is that consuming our whole events history to recreate state in a dependent datastore will not be fast nor efficient by any measure.

Thanks for responses, I want to respond more in full too, but here's a quick thought:

Devise a way to be able to link an event to the database transaction/revid/whatever that generated it, and have a system that allows reconciliation. For instance: one revision is missing from the event log but is present in the database? then generate the event that we're missing.

I think this is similar to the transactional outbox idea. There are many variations on this idea, but in the description I wrote:

The revision event is produced to Kafka. When the Kafka produce request succeeds, the revision_event_log's produced_at timestamp (or boolean) field is set.
A separate process polls the revision_event_log table for records where produced_at is NULL, produces them to Kafka, and sets produced_at when the produce request succeeds.

I think your suggestion here is similar, but without the revision_event_log outbox table. You are suggesting that we make it possible for a process to poll MW tables and determine what events were not emitted, and then emit them. I think that could work; although it might be difficult to recreate some of the context we have at original event time (e.g. user_edit_count, page_is_redirect), but I think it should be possible!

Generate a "daily events snapshot"

Agree that this will be needed for sure! It isn't yet clear if we can do this 100% from Kafka, or will need a separate place (Hadoop? Swift?) to store this. Having everything in Kafka would make bootstrapping applications simpler, but it might just be too big / inefficient as you say. Maybe Kafka compacted topics will help us?

Don't use the database as a queue

I'd argue that neither CDC (consuming the binlog) or the Outbox ideas are doing this. Perhaps this is just terminology, but what they are doing is using the database as a temporary log.

  • CDC just relies on the existent replication binlog (which...I wouldn't call a queue?).
  • I'd also call the Outbox table a log (similar to the the MW logging table), but I guess it depends on how it is used.
    • In the Hybrid CDC + Outbox idea, nothing ever reads the Outbox table. Its content is consumed from the replication binlog and produced into Kafka as soon as possible.
    • In the normal Outbox (no CDC), there is a separate process that polls the Outbox and produces the events. If, at request time, we write to the Outbox, produce the event, and then mark the Outbox record as produced (outside of the transaction), the Outbox becomes a simple temporary buffer for which the poller is only looking for records that have yet to be produced.

Don't use the database as an integration layer. As @Ladsgroup clearly pointed out, it turns ugly very fast once you consider real-life situations (like - an event likely needs to source from multiple MediaWiki datastores at once), and the advantage of "consistency" evaporates quickly

I don't fully understand this one, could you explain more? How is using the database to get consistency between database state and events going to affect the way events are used? We already produce events in a deferred update over HTTP outside of a transaction, so there is no consistency between multiple databases. MediaWiki itself has to deal with multiple database consistency, right? In the cases where the event needs to source from multiple MediaWiki datastores, the events will have the same consistency problem as MediaWiki does. If we utilize database transactions for this, we can at least be guaranteed that events will be consistent with at least ONE MediaWiki data store, which is really the best that MediaWiki can do anyway (right?).


@Joe, given the difficulties in using binlog CDC in our current MariaDB setup (I'm going to try to start saying MariaDB instead of MySQL) outlined by Manuel and Stevie, I'm starting to more prefer a polling based option. I think using an Outbox is still a good idea though, mainly because it gives us full control over how the events are modeled, and would be much easier to code and integrate with EventBus now. What about this?

  • During the DB transaction, EventBus writes the event into an outbox table with a payload and a produced_at DEFAULT NULL timestamp (this will mostly be used as a boolean).
  • In the deferred update, EventBus POSTs the event to EventGate, and if successful, sets the produced_at field.
  • A separate poller process selects records from the outbox table where produced_at IS NULL, and POSTs the payload to EventGate, and if successful, sets the produced_at field.
    • This or another process deletes records where produced_at IS NOT NULL.

Or some variation. This is pretty similar to your reconciliation idea, but allows EventBus to continue working as is, with a little eventual-consistency-reconciliation added in.

How would it handle if the replica goes down for a long time or even forever (ie HW crash)?
Right now the way we handle that is...we just rebuilt it from either the backups/another replica which are in 100% of the cases in a different binlog/position.
...
Not super relevant as the problem stands still, if we need to include this host in production then we do need to deal with possible master failures or swap.

@Marostegui Am trying to understand this a little more. How do you do master swaps now? From these comments I understood that the binlog position was not in sync in all the different binlogs out there. It that correct? I.e.

master A -> replica B -> replica C

insert into revision ... (rev_id 10) might be at position 456 for master A, but at position 123 in the binlog that replicas from B -> C.

Did I get that right?

Ottomata renamed this task from Reliable (atomic) MediaWiki event production / MediaWiki events as source of truth to Consistent MediaWiki state change events | MediaWiki events as source of truth.Mar 24 2021, 2:38 PM
Ottomata updated the task description. (Show Details)

I'll try to get together a concrete list of services that currently do rely on MediaWiki events, as well as ones in the near future that are likely to.

I started this process by asking some folks I know want to rely on MediaWiki state change events to describe what they needed it for, and I edited the description of the task to include their comments.

Another idea that may not be feasible: Would it be possible to move the event produce call out of the deferred update to before MediaWiki closes the MariaDB transaction? I.e.

  1. open MariaDB transaction
  2. insert into revision, etc.
  3. produce event
  4. close MariaDB transaction

In this way, we might produce spurious events which are not actually persisted in the database, but that should be easier to reconcile than missing events. E.g. Most consumers would probably have to ask MW API later for the revision content anyway, and if MW doesn't have the event's rev_id, MW API request will let the consumer know.

I'm not sure what happens with the rev_ids in our MariaDB transactions though. Are they created and available to MediaWiki before the transaction is closed?

Keep in mind that, strictly speaking, some of these problems are not even solved in MediaWiki for "core" DB shards. These include the "main" s[1-8] shards and the "extension" x1 shard. For example, a web request might update an S1 (enwiki) and S7 (centralauth) in one "transaction round", which just means that each of the relevant DB connections are checked for connectivity (pinged if there was no activity < 1 sec ago), and, after that passes, then the COMMITs are made in rapid succession. It is still possible, though very unlikely, that a proper subset of the transactions fail. Also, some events might be triggered from onTransaction() callbacks or PRESEND deferred updates.

Luckily, the non-core "external store" DB servers are not an issue, since we uses AUTOCOMMIT mode for the blob insertions, failure to find somewhere to store a blob aborts the main transaction round (of core DB shards), and orphaned external store blobs don't matter. Also, we can atomically commit to multiple core databases on the same shard/master (e.g. metawiki and centralauth), so that is not an issue either.

So...back to MediaWiki events... Some events might be "higher level" than "something happened in this DB", e.g. they may or may not be coupled to writes to other core DBs. Also, those writes will mostly be in the transaction round, but some might happen shortly after it. If only one of the DB's COMMITs succeeded (rare, but possible), do you say that the whole event failed or not?

Another complexity is around the metadata used in kafka events to figure out what was committed or not. DBAs can change the masters for a shard and even move DBs to different shards (e.g. moving wikidata from s5 to s8). This is tracking in "sectionsByDB" in wmf-config. The "extension" shards are more complicated. Suppose you have 5 extensions participate in a transaction that involves kafka events, the DBA makes x1 read-only and moves 1 extension off to x3. Ideally, we'd use a "sectionsByModule" LBFactory option like "sectionsByDB", but no such thing exists (just various per-extension config vars). Even with that, any "queue/log in a DB" approach would be impractical.

Anyway, suppose that we *are* just talking about events that only involve writes to a single core DB server. In theory, we could do something like:

  1. BEGIN
  2. ...issue regular writes queires...
  3. generate a monotonic "global commit id" UUID
  4. INSERT a row with the UUID into gcids_committed (each MW core mysql server would have *one* such table, not one per database)
  5. synchronously push JSON events to kafka, each including the UUID and DB shard; if the kafka response is not definitive success (including "non-answers"), then ROLLBACK and abort
  6. COMMIT

Kafka might have phantom events (e.g. not committed in the DB), but it would never be missing events. Kafka consumers would have to distinguish two types of phantom events: events from failed transactions, which can be discarded, and events that belong to pending transactions, which cannot be treated as actionable nor discarded. If the transaction is pending, the consumer should wait until the transaction ends before processing the event or any newer events.

A kafka consumer could distinguish pending and failed transaction by using READ_UNCOMMITTED and LOCK_IN_SHARE_MODE. You can test this locally using a sql client, even 'maintenance/sql.php' (assuming MediaWiki uses mysql):

  1. Terminal A: CREATE TABLE IF NOT EXISTS gcids_committed (gcid DECIMAL(29));
  2. Terminal B: SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
  3. Terminal C: SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
  4. Terminal A: BEGIN;
  5. Terminal A: INSERT INTO gcids_committed (gcid) VALUES (1);
  6. Terminal B: SELECT * FROM gcids_committed WHERE gcid=1 LOCK IN SHARE MODE;
  7. Terminal B: (now blocked waiting terminal A transaction)
  8. Terminal C: SELECT * FROM gcids_committed WHERE gcid=1 LOCK IN SHARE MODE;
  9. Terminal C: (now blocked waiting terminal A transaction)
  10. Terminal A: COMMIT;
  11. Terminal B: (now unblocked)
  12. Terminal C: (now unblocked)

For postgres, using READ UNCOMMITTED would not work; one could instead use pg_advisory_lock/pg_advisory_lock_shared. A similar approach for mysql would be using GET_LOCK(), though the kafka consumers would block each other a bit. E.g.:

  1. Terminal A: CREATE TABLE IF NOT EXISTS gcids_committed (gcid DECIMAL(29));
  2. Terminal A: BEGIN;
  3. Terminal A: SELECT GET_LOCK('gcid-id-1',0) -- ROLLBACK if this yields 0 -- ;
  4. Terminal A: INSERT INTO gcids_committed (gcid) VALUES (1);
  5. Terminal B: SELECT GET_LOCK('gcid-id-1',4294967295);
  6. Terminal B: (now blocked waiting terminal A transaction)
  7. Terminal C: SELECT GET_LOCK('gcid-id-1',4294967295);
  8. Terminal C: (now blocked waiting terminal A transaction)
  9. Terminal A: COMMIT;
  10. Terminal A: SELECT RELEASE_LOCK('gcid-id-1');
  11. Terminal B: (now unblocked)
  12. Terminal B: SELECT * FROM gcids_committed WHERE gcid=1;
  13. Terminal B: SELECT RELEASE_LOCK('gcid-id-1');
  14. Terminal C: (now unblocked)
  15. Terminal C: SELECT * FROM gcids_committed WHERE gcid=1;
  16. Terminal C: SELECT RELEASE_LOCK('gcid-id-1');

In terms of sudden master crashes and replica promotion, we do use semi-sync replication to increase the odds that no data is lost. On the enwiki master, mysql has rpl_semi_sync_master_wait_point = AFTER_COMMIT, so the GET_LOCK approach would be safer, since kafka consumers would not see gcids_committed UUIDs only known to the master (in semi-sync replication mode). In any case, it is possible for semi-sync replication to automatically stop waiting on replicas for while due to rpl_semi_sync_master_timeout, so there is no hard safety guarantee here.

Note that the above scheme does not involve pushing bulky JSON blobs to mysql. It would add some read traffic to the shard masters though (1 query per event per subscribed consumer). It was also require synchronous kafka writes, though one could batch write them during the pre-COMMIT stage. The gcids_committed tables would be one-per-server and append-only. Shards would not be removed unless consumers are all caught up (we've only removed es* shards before AFAIK, and those don't matter, so this should be a minor point). If a wiki moves from main shard A to B, then the DBA would leave gcids_committed alone on shard A (even though they would eventually prune the old copy of the wiki DB). This makes it easy on consumers trying to validate kafka events. If an extension is moved from extension shard 1 to 3, the same approach would be used.

Since we *do* have transaction rounds spanning several core/extension DB servers, you'd either have to pick the most "meaningful" transaction (usually the one from the connection to the wiki's main shard server) or store them all and check all of them (though it is not obvious what to do if only some failed).

As @Joe said, some kind of reconciliation approach would be good. Regardless of what "probabilistic" atomicity improvements are made, it will probably be necessary as a fallback when something goes wrong.

How would it handle if the replica goes down for a long time or even forever (ie HW crash)?
Right now the way we handle that is...we just rebuilt it from either the backups/another replica which are in 100% of the cases in a different binlog/position.
...
Not super relevant as the problem stands still, if we need to include this host in production then we do need to deal with possible master failures or swap.

@Marostegui Am trying to understand this a little more. How do you do master swaps now? From these comments I understood that the binlog position was not in sync in all the different binlogs out there. It that correct? I.e.

master A -> replica B -> replica C

The process is as follows:

  • We move all the slaves under the future master (which is still a replica from the current master). The process to do that is basically to stop replication at the same time on both hosts for a few seconds, once they are stopped (on the same binlog position) we move one of the hosts under the future master. That is repeated with all the slaves in the topology.
  • Once we have master > future master > all the replicas we set read-only on the master so the binlog stops moving.
  • Once that is done, we move the current master under the future master and disconnect replicationthere, so the future master has now no master on top, becoming the master of the topology and the old master becomes a replica.
  • Finally, we change MW to point to the new master
  • We remove read-only

Interesting thanks! So brainstorming how that would work for Debezium, since Debezium is just a slave process consuming a binlog, would it be possible to just stop it, change configs so it points at a new master, and start it? As long as the same binlog position exists on the old and new master, would that work?

Interesting thanks! So brainstorming how that would work for Debezium, since Debezium is just a slave process consuming a binlog, would it be possible to just stop it, change configs so it points at a new master, and start it? As long as the same binlog position exists on the old and new master, would that work?

I am not sure how this would work with our scripts to automate all this movement (and with orchestrator, which is most likely the tool we'll use in the future to handle replicas movement).

Right, I understand that the extra maintenance this would cause could be too onerous to for Debezium be a good solution to this problem, I'm mostly just trying to understand. Assuming the master swap was done manually, would the procedure I suggested work technically?

Another idea that may not be feasible: Would it be possible to move the event produce call out of the deferred update to before MediaWiki closes the MariaDB transaction? I.e.

  1. open MariaDB transaction
  2. insert into revision, etc.
  3. produce event
  4. close MariaDB transaction

In this way, we might produce spurious events which are not actually persisted in the database, but that should be easier to reconcile than missing events. E.g. Most consumers would probably have to ask MW API later for the revision content anyway, and if MW doesn't have the event's rev_id, MW API request will let the consumer know.

I'm not sure what happens with the rev_ids in our MariaDB transactions though. Are they created and available to MediaWiki before the transaction is closed?

In addition to what @aaron said, let me give you a reliability prespective:

This is a really bad idea, it would make the database availability depend on the availability of eventgate, and make the two systems tied to each other, and/or to leave db transactions open for a long time anyways.

It would make sense to e.g. add to eventgate a local queue on disk for events that have failed to submit? That should almost-remove errors to tolerable levels.

it would make the database availability depend on the availability of eventgate, and make the two systems tied to each other, and/or to leave db transactions open for a long time anyways.

Yeah makes sense.

eventgate a local queue on disk for events that have failed to submit?

Could help, but it seems simpler and more complete to write to an outbox table in a transaction, no?

Drive by comments by yours truly:

  • Do we have estimations (or even better hard data) as to the number of missed events?
  • The few event driven blogs and articles I 've read, regardless of how the implementation is done, push for the idea of a commit log that contains the entire history of all events (allowing e.g. for very easy transformations, aggregations as well as other operations) . I gather from the task that this is NOT what we want to do here, right? We just want to increase the reliability of having some very specific events produced and delivered, is that right ?

Do we have estimations (or even better hard data) as to the number of missed events?

See T215001: Revisions missing from mediawiki_revision_create. A patch by Clara and Petr is going out with the wmf.5 train this week that may mitigate the majority of missing events.

the idea of a commit log that contains the entire history of all events [...] I gather from the task that this is NOT what we want to do here, right?

We do want this, but it is not certain (or likely?) that we will keep this entire history sourceable in Kafka. We keep this history in analytics Hadoop now, but one day imagine having a Shared Data Platform from which historical (and other) data can be bootstrapped/sourced from some 'cold storage', and then if desired, continuable from Kafka.

Anyway, this task is not about keeping the entire history in Kafka, but it is about making the events we emit as consistent as possible. Depending on the use case, downstream apps/datastores that have copies of the data may need some consistency reconciliation (i.e. lambda arch) to ensure the data is fully consistent over time, but the stronger we get the event streams to be consistent the better.

I think we'll some day (in a quarter or two?) file a Technical Decision Statement Overview that more broadly describes the problem as you state: making sure MW event data is consistent (as defined by some SLOs?) and (mostly) complete, meaning all relevant MW state change data is captured as events (we're missing things currently that could be very useful, like revision content etc.)

Do we have estimations (or even better hard data) as to the number of missed events?

See T215001: Revisions missing from mediawiki_revision_create. A patch by Clara and Petr is going out with the wmf.5 train this week that may mitigate the majority of missing events.

Ah, so ~1.5% of create revisions are missing. Assuming this can be generalized for all events, that sounds quite a bit to be honest, we should indeed find ways to increase the reliability. Let's see if the patch above and similar changes in the past have had a success at that.

the idea of a commit log that contains the entire history of all events [...] I gather from the task that this is NOT what we want to do here, right?

We do want this.

Let me just say that it sounds very unplausible that it can happen. @aaron and @Ladsgroup, as well as @Joe have pointed out in this task why that is, I will add that having 2 different distributed systems perfectly synced (to the point where they both contain the entire history of all events) is neigh to impossible.

but it is not certain (or likely?) that we will keep this entire history sourceable in Kafka.
We keep this history in analytics Hadoop now, but one day imagine having a Shared Data Platform from which historical (and other) data can be bootstrapped/sourced from some 'cold storage', and then if desired, continuable from Kafka.

Sure, that sounds fine. As long as the expectation is some thing might be missing from there, it sounds pretty reasonable to me.

Anyway, this task is not about keeping the entire history in Kafka, but it is about making the events we emit as consistent as possible. Depending on the use case, downstream apps/datastores that have copies of the data may need some consistency reconciliation (i.e. lambda arch) to ensure the data is fully consistent over time, but the stronger we get the event streams to be consistent the better.

+1 to both. Increase the reliability of event producing is definitely the way to go here.

I think we'll some day (in a quarter or two?) file a Technical Decision Statement Overview that more broadly describes the problem as you state: making sure MW event data is consistent (as defined by some SLOs?) and (mostly) complete, meaning all relevant MW state change data is captured as events (we're missing things currently that could be very useful, like revision content etc.)

+1 again and +1 to the SLOs.

the idea of a commit log that contains the entire history of all events [...] I gather from the task that this is NOT what we want to do here, right?

We do want this.

Let me just say that it sounds very unplausible that it can happen. @aaron and @Ladsgroup, as well as @Joe have pointed out in this task why that is, I will add that having 2 different distributed systems perfectly synced (to the point where they both contain the entire history of all events) is neigh to impossible.

Makes sense. I think what I would like to go for is events that are as (or almost as) consistent as MariaDB replication for a single MW database. Events that need cross-DB transactions are likely not worth the effort to improve their consistency.

New developments in this are of interest: watermark change data capture framework from netflix that aims to do what this task is about, streaming data from source A to source B taking into account an initial snapshot: https://arxiv.org/pdf/2010.12597v1.pdf

Debezium 1.7 incorporates some of the ideas of netflix paper

Huh, very interesting paper! @Nuria did you read? I mostly understand, but had some questions as I worked through the algorithm. In Figure 4., k2 and k4 are part of the final chunk written to the output buffer, but they are also earlier in the binlog (and output buffer). Are those then duplicates in the output? Should be fine to have duplicates, as the state should be idempotent, but am just curious since they don't mention it.

In https://phabricator.wikimedia.org/T215001#7523796 @Milimetric did some analysis on missing revision create events and determined that in the month of October mediawiki.revision-create was:

99.9986% reliable, or 0.0014% missing events.

There were only 67 missing events. Ultimately we shouldn't miss these events (especially if MariaDB replicas don't), so we still need to find a solution, but I think the priority of this is probably less than before Petr fixed the envoy timeouts. Great stuff!

This is this kind of thing we need to have a way to reconcile: https://wikitech.wikimedia.org/wiki/Incident_documentation/2021-11-25_eventgate-main_outage

In the case of outages like this, state changes written to MediaWiki MariaDB still need to eventually make their way into the event streams if we are to use the events to carry state to other datastores and services.