Page MenuHomePhabricator

[Epic] Rework the WDQS updater as an event driven application
Open, Needs TriagePublic

Description

The current merging strategy for applying updates require sending all the entity data on every update. The goal of this task is to design a new updater that will be able to send the minimal number of triples to the RDF store to synchronize the graph with the state of wikibase.
Note: this is a very rough plan and many details will probably change as implementation specific requirements will pop up.

The proposed approach relies on a system able to do stateful computation over data streams (flink).

Based on a set of source event streams populated by mediawiki and change propagation the steps are:

  1. filter: filter events related to wikibase and its entities
  2. event time reordering: reorder the events and assemble them to a single partitioned stream
  3. rev state evaluation: determine what command needs to applied to mutate the graph
    • this steps require holding a state of previously seen revision and other actions (e.g. visibility change)
    • the output of this is a simple event without any data saying: do a diff between rev X and Y, fully delete entity QXYZ, ...
    • the initial state will be populated using the revisions present in the RDF dump
    • seen revisions (after a fresh import) will be easy to discard
  4. rdf diff generation: materialize the command and fetch the data from wikibase and send it over a RDF stream
    • it's probable that in some cases (suppressed delete) the exact set of triples to be deleted will be unknown and thus will require a special delete command to be applied to the backend
  5. rdf import: The components reading this stream will be very similar to the current updater: a process running locally on the wdqs nodes pushing data to blazegraph

For the first iteration no cleanups will be performed, orphaned values & references will remain in the RDF store. This will be mitigated by more frequent reloads of the dump.
Such system being prone to deviation frequent reloads will be important, it's important to note that the state of step 3 is tightly coupled with its dump and thus we will have to instantiate a new stream per imported dump. In other words a wdqs system imported using dump Y will have to consume the RDF stream generated from an initial state based on this same dump. This means that the RDF stream will be named against a particular dump instance.

Note on event time reordering:

Note on state management:

Note on initial state:

Related Objects

StatusSubtypeAssignedTask
OpenNone
ResolvedZbyszko
Resolveddcausse
OpenZbyszko
ResolvedZbyszko
OpenNone
Resolveddcausse
Resolveddcausse
ResolvedZbyszko
ResolvedGehel
DeclinedNone
ResolvedZbyszko
ResolvedZbyszko
Resolveddcausse
Resolveddcausse
Resolveddcausse
ResolvedOttomata
ResolvedRKemper
ResolvedMstyles
ResolvedZbyszko
StalledNone
OpenNone
Resolveddcausse
ResolvedMstyles
Resolveddcausse
ResolvedMstyles
Resolveddcausse
Resolveddcausse
Resolveddcausse
OpenNone
Resolveddcausse
Resolveddcausse
ResolvedJAllemandou
OpenNone
OpenNone
Resolveddcausse
OpenNone
OpenMstyles
OpenMstyles
OpenMstyles
OpenNone
OpenNone
Resolveddcausse
ResolvedZbyszko
OpenNone
Resolveddcausse
OpenZbyszko
ResolvedZbyszko
ResolvedZbyszko
OpenNone
DuplicateNone

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

COOL! :)

it's important to note that the state of step 3 is tightly coupled with its dump and thus we will have to instantiate a new stream per imported dump. In other words a wdqs system imported using dump Y will have to consume the RDF stream generated from an initial state based on this same dump. This means that the RDF stream will be named against a particular dump instance.

Hm. Would it be possible instead to lambda architecture this part? Instead of having to reload from a full dump and then recreate a new stream, could accomplish the same cleanups by backfilling from a batch job in Hadoop? I'm not sure I fully understand the 'cleanups' here. Are they not do-able with the stream because events representing some of the state changes don't exist (yet)?

I hope that in the future once the stream has been stabilized yes reloading the system might become less necessary and that a fresh and consistent dump can be reconstructed (daily?) using the stream itself.
Reloading from the dump generated by MW is something we need anyways in order to bootstrap the system and at the beginning will be needed to circumvent:

  • bug fixes (bug where the data is simply lost)
  • lost events (undetected failures or bugs in MW)
  • cleanup

The cleanup operation mentioned here is a sort of "garbage collection", to simplify we need to detect unused resources (subgraph) in the graph, the stream itself does not know this unless we keep another large state doing references counting.
The solution proposed here is to simply spawn a new system from time to time (the dump generated by MW is clean) so that we do cleanup and fix lost events at the same time, but I agree with you this is not ideal and leveraging more batch jobs and/or more states in the stream will help minimize the need to do a full reload.

revi added a subscriber: revi.Apr 17 2020, 6:46 PM
Harej awarded a token.May 8 2020, 4:35 PM
Gehel moved this task from Scaling to Epics on the Wikidata-Query-Service board.Jun 24 2020, 12:48 PM
Thadguidry updated the task description. (Show Details)Sep 2 2020, 10:46 AM
  • the output of this is a simple event without any data saying: do a diff between rev X and Y, fully delete entity QXYZ, ...

Is that supposed to be "data saving" ?

rdf diff generation: materialize the command and fetch the data from wikibase and send it over a RDF stream

Will that be an uncompressed RDF stream ? When mentioning streams in any design its always best to say compressed/uncompressed for accuracy. You might have a note on streams; to mention that and any other nuances about them.

  • the output of this is a simple event without any data saying: do a diff between rev X and Y, fully delete entity QXYZ, ...

Is that supposed to be "data saving" ?

The reason was not really data saving, the main reason is that we don't know the set of triples to delete. There are no ways to ask Wikibase to output the RDF data of a deleted entity.

rdf diff generation: materialize the command and fetch the data from wikibase and send it over a RDF stream

Will that be an uncompressed RDF stream ? When mentioning streams in any design its always best to say compressed/uncompressed for accuracy. You might have a note on streams; to mention that and any other nuances about them.

Currently it's uncompressed turtle (which is far from being the best format for such usecase but it has been the historical format for data exchange in this stack). Note that if I'm not mistaken our kafka setup will compress the data transparently. The data model has been made in such a way that in the future we could use other format such as RDF Thrift or anything else we think more appropriate.

dcausse updated the task description. (Show Details)Sep 2 2020, 2:31 PM
Gehel renamed this task from EPIC: Rework the WDQS updater as an event driven application to [Epic] Rework the WDQS updater as an event driven application.Oct 6 2020, 2:16 PM