While working on T368754, we started leveraging Spark JDBC Datasource's parallel reading (docs, usage example), but hit a quite interesting problem when trying to read all data of the revision table.
Nnormally, we would be able to use, say, 64 executors and split the revision table to be read by partitioning on the rev_timestamp column with code like:
jdbc_df = spark.read \ .format("jdbc") \ ... .option("partitionColumn", "rev_timestamp") \ .option("numPartitions", 64) \ ... .load()
This Spark mechanism only works with TIMESTAMPs, DATEs and the family of INTEGERs, because Spark needs to know how to define the partition boundaries automatically. Reasonable.
But MediaWiki defines rev_timestamp as:
+----------------+---------------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +----------------+---------------------+------+-----+---------+----------------+ | rev_id | bigint(20) unsigned | NO | PRI | NULL | auto_increment | ... | rev_timestamp | binary(14) | NO | MUL | NULL | | ...
with an index as:
+----------+------------+--------------------------+--------------+---------------+-----------+-------------+----------+--------+------+------------+---------+---------------+ | Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | +----------+------------+--------------------------+--------------+---------------+-----------+-------------+----------+--------+------+------------+---------+---------------+ | revision | 0 | PRIMARY | 1 | rev_id | A | 0 | NULL | NULL | | BTREE | | | | revision | 1 | rev_timestamp | 1 | rev_timestamp | A | 0 | NULL | NULL | | BTREE | | | ...
Thus if Spark generates a query like:
SELECT * FROM revision WHERE rev_timestamp >= TIMESTAMP '2024-01-01' AND rev_timestamp < TIMESTAMP '2024-02-01'
Such query will *NOT* hit the rev_timestamp index because MariaDB doesn't know how to CAST a TIMESTAMP to a BINARY(14). Not hitting the index makes the query crawl and fail in all but the smallest wikis.
We can go around this by doing generating a query like:
SELECT * FROM revision WHERE rev_timestamp >= TO_CHAR(TIMESTAMP '2024-01-01', 'YYYYMMDDHH24MISS') AND rev_timestamp < TO_CHAR(TIMESTAMP '2024-02-01', 'YYYYMMDDHH24MISS')
However, there is no current mechanism in Spark's JDBC datasource to send that information in. And that is reasonable too, because I suspect there are not many schemas that define their timestamps as BINARY...
In this task, we want to explore this problem and try to figure out a workaround. Some ideas:
- Look into Spark code, see if we can easily modify the JDBC data source to accept a function
- Modify the code to run on top of the Sqooped tables rather than hitting the Analytics replicas for when we want to do these full runs over the revision table.
- Suggest to SREs that we convert BINARY(14) to TIMESTAMPs? I presume this is there for historical reasons.
- Modify PySpark code so that we partition the table via rev_id, which is an BIGINT, and live with 2 separate code bases that do the same thing?