The the current merging strategy for applying updates require sending all the entity data on every update. The goal of this task is to design a new updater that will be able to send the minimal number of triples to the RDF store to synchronize the graph with the state of wikibase.
Note: this is a very rough plan and many details will probably change as implementation specific requirements will pop up.
The proposed approach relies on a system able to do stateful computation over data streams (flink).
Based on a set of source event streams populated by mediawiki and change propagation the steps are:
- filter: filter events related to wikibase and its entities
- event time reordering: reorder the events and assemble them to a single partitioned stream
- rev state evaluation: determine what command needs to applied to mutate the graph
- this steps require holding a state of previously seen revision and other actions (e.g. visibility change)
- the output of this is a simple event without any data saying: do a diff between rev X and Y, fully delete entity QXYZ, ...
- the initial state will be populated using the revisions present in the RDF dump
- seen revisions (after a fresh import) will be easy to discard
- rdf diff generation: materialize the command and fetch the data from wikibase and send it over a RDF stream
- it's probable that in some cases (suppressed delete) the exact set of triples to be deleted will be unknown and thus will require a special delete command to be applied to the backend
- rdf import: The components reading this stream will be very similar to the current updater: a process running locally on the wdqs nodes pushing data to blazegraph
For the first iteration no cleanups will be performed, orphaned values & references will remain in the RDF store. This will be mitigated by more frequent reloads of the dump.
Such system being prone to deviation frequent reloads will be important, it's important to note that the state of step 3 is tightly coupled with its dump and thus we will have to instantiate a new stream per imported dump. In other words a wdqs system imported using dump Y will have to consume the RDF stream generated from an initial state based on this same dump. This means that the RDF stream will be named against a particular dump instance.
Note on event time reordering:
- seems to be relatively easy in flink: e.g. https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/examples/datastream_java/process/CarEventSort.java
Note on state management:
- RocksDB offers incremental checkpoint and seems to support high cardinality https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend quite well, the operation where we need a large state seems to be partitionable and thus the state can be split into multiple buckets.
Note on initial state:
- seems to be allowed by flink using its state-processor-api: https://flink.apache.org/feature/2019/09/13/state-processor-api.html