Page MenuHomePhabricator

Figure a performant way to read all data from revision table via Spark
Closed, ResolvedPublic

Description

While working on T368754, we started leveraging Spark JDBC Datasource's parallel reading (docs, usage example), but hit a quite interesting problem when trying to read all data of the revision table.

Nnormally, we would be able to use, say, 64 executors and split the revision table to be read by partitioning on the rev_timestamp column with code like:

jdbc_df = spark.read \
    .format("jdbc") \
    ...
    .option("partitionColumn", "rev_timestamp") \
    .option("numPartitions", 64) \
    ...
    .load()

This Spark mechanism only works with TIMESTAMPs, DATEs and the family of INTEGERs, because Spark needs to know how to define the partition boundaries automatically. Reasonable.

But MediaWiki defines rev_timestamp as:

+----------------+---------------------+------+-----+---------+----------------+
| Field          | Type                | Null | Key | Default | Extra          |
+----------------+---------------------+------+-----+---------+----------------+
| rev_id         | bigint(20) unsigned | NO   | PRI | NULL    | auto_increment |
...
| rev_timestamp  | binary(14)          | NO   | MUL | NULL    |                |
...

with an index as:

+----------+------------+--------------------------+--------------+---------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| Table    | Non_unique | Key_name                 | Seq_in_index | Column_name   | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |
+----------+------------+--------------------------+--------------+---------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| revision |          0 | PRIMARY                  |            1 | rev_id        | A         |           0 |     NULL | NULL   |      | BTREE      |         |               |
| revision |          1 | rev_timestamp            |            1 | rev_timestamp | A         |           0 |     NULL | NULL   |      | BTREE      |         |               |
...

Thus if Spark generates a query like:

SELECT *
FROM revision
WHERE rev_timestamp >= TIMESTAMP '2024-01-01'
  AND rev_timestamp  < TIMESTAMP '2024-02-01'

Such query will *NOT* hit the rev_timestamp index because MariaDB doesn't know how to CAST a TIMESTAMP to a BINARY(14). Not hitting the index makes the query crawl and fail in all but the smallest wikis.

We can go around this by doing generating a query like:

SELECT *
FROM revision
WHERE rev_timestamp >= TO_CHAR(TIMESTAMP '2024-01-01', 'YYYYMMDDHH24MISS')
  AND rev_timestamp  < TO_CHAR(TIMESTAMP '2024-02-01', 'YYYYMMDDHH24MISS')

However, there is no current mechanism in Spark's JDBC datasource to send that information in. And that is reasonable too, because I suspect there are not many schemas that define their timestamps as BINARY...

In this task, we want to explore this problem and try to figure out a workaround. Some ideas:

  • Look into Spark code, see if we can easily modify the JDBC data source to accept a function
  • Modify the code to run on top of the Sqooped tables rather than hitting the Analytics replicas for when we want to do these full runs over the revision table.
  • Suggest to SREs that we convert BINARY(14) to TIMESTAMPs? I presume this is there for historical reasons.
    • Modify PySpark code so that we partition the table via rev_id, which is an BIGINT, and live with 2 separate code bases that do the same thing?

Details

Other Assignee
xcollazo
Related Changes in Gerrit:

Event Timeline

Milimetric moved this task from Sprint Backlog to In Process on the Dumps 2.0 (Kanban Board) board.

temporarily grabbing this to look into the "modify the JDBC data source to accept a function" option.

Seems easy enough to extend, here's the PR that added support for timestamp. but I doubt upstream would ever want to merge. So this would just be our personal little hack.

In looking at this I found that the Spark side of the call is actually very flexible. It can take complex casts/subqueries for the dbtable option and even functions/casts for the partitionColumn option. So it's extra frustrating that I can't convince mariadb to use the indices, it feels very doable but just out of reach.

Well, as usual the actual logic is pretty easy but I'm fighting with java/scala to build this jar with the correct version of scala. For some reason it's picking up 2.13. But in theory I kind of have this working, the patch is super simple and would be very easy to build and maintain as we upgrade Spark. It only requires a few lines of code to be changed. So except for java being insane, this kind of thing could work well:

scala> val g = (spark.read.format("org.apache.spark.sql.custom.jdbc.WmfJdbcRelationProvider")
     |             .option("driver", "com.mysql.cj.jdbc.Driver")
     |             .option("url", "jdbc:mysql://dbstore1007.eqiad.wmnet:3313/etwiki")
     |             .option("dbtable", "revision")
     |             .option("user", "research")
     |             .option("password", password)
     |             .option("partitionColumn", "rev_timestamp")
     |             .option("lowerBound", "20240101")
     |             .option("upperBound", "20240201")
     |             .option("numPartitions", 4)
     |             .load)
java.lang.NoSuchMethodError: scala.collection.mutable.ArrayBuffer.$plus$eq(Ljava/lang/Object;)Lscala/collection/mutable/Growable;
  at org.apache.spark.sql.custom.jdbc.JDBCRelation$.columnPartition(WmfJdbcRelation.scala:145)
  at org.apache.spark.sql.custom.jdbc.WmfJdbcRelationProvider.createRelation(WmfJdbcRelationProvider.scala:35)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
  at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
  ... 57 elided

Change #1071624 had a related patch set uploaded (by Milimetric; author: Milimetric):

[analytics/refinery/source@master] Implement custom jdbc datasource

https://gerrit.wikimedia.org/r/1071624

Nice work. I thought this was going to be a 'spark fork', but you've only 'forked' the datasource.

Note that we have done this kind of thing before in wikimedia-event-utilities when we needed to override some very specific bits of how built in flink serialization worked.

https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonRowDeserializationSchema.java?autodive=0%2F%2F%2F%2F%2F#85

I link this because to help maintainers, I think you should add comment documentation about how the classes are a copy paste from stream, how they are modified, and in each place where you've made a modification in the code, explicitly surround it with a comment block saying so. E.g.

https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonRowDeserializationSchema.java?autodive=0%2F%2F%2F%2F%2F#138

You've kind of done this in the commit message, but it would be good to put in code so it easy to find in the future.

Ah I did not know! Yeah, I was going for the same kind of thing with organizing the gerrit change, and I figured I could link from it. The customizations are pretty basic, this is the only actual change besides the import/package stuff:

https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/1071624/4..5/refinery-spark/src/main/scala/org/apache/spark/sql/execution/datasources/custom/jdbc/WmfJdbcRelation.scala

I'll put it in code, good call.

Anyway, it works now, yay!

Anyway, it works now, yay!

🎉 🎉 🎉

Quick update:

This was tested to work with Spark 3.3 in a notebook (see method below). Therefore the last thing to do is to figure out how to tune Spark to pull different wikis. Ideally we'd have some kind of function that if you just give it a boundary query it can automatically configure a number of mappers to use. The balance here is:

  • more mappers than you need: using up MariaDB connections and adding overhead. The extreme example would be using 64 mappers to pull user_groups which has a few dozen roles. Tables like that shouldn't parallelize at all. But that may also be true of a very new wiki's revision table.
  • fewer mappers than you need: for big data, this could cause a big slow-down too. If you pull wikidatawiki's revision table with 2 mappers you'll never finish.
# Use custom "mediawiki-jdbc" Spark datasource from refinery-source on Spark 3.3:
# on a stat machine
set_proxy

source /opt/conda-analytics/etc/profile.d/conda.sh

conda create -n spark33 python=3.10.8 pyspark=3.3.1 conda-pack=0.7.0 ipython jupyterlab=3.4.8 jupyterhub-singleuser=1.5.0 urllib3=1.26.11
conda activate spark33
pip install git+https://github.com/wikimedia/wmfdata-python.git@v2.3.0

cd /home/milimetric/.conda/envs/spark33/lib/python3.10/site-packages/pyspark/jars
mkdir ~/artifacts
zip -r ~/artifacts/spark-3.3.1-assembly.zip .
hdfs dfs -mkdir artifacts
hdfs dfs -copyFromLocal ~/artifacts/spark-3.3.1-assembly.zip artifacts/
hdfs dfs -chmod +r artifacts/spark-3.3.1-assembly.zip


##### IN A NOTEBOOK

%env SPARK_HOME=/home/milimetric/.conda/envs/spark33/lib/python3.10/site-packages/pyspark

%env SPARK_CONF_DIR=/etc/spark3/conf

import wmfdata

spark = wmfdata.spark.create_custom_session(
    master='yarn',
    spark_config={
        "spark.shuffle.service.name": 'spark_shuffle_3_3',
        "spark.shuffle.service.port": '7338',
        "spark.dynamicAllocation.maxExecutors": 32,
        "spark.executor.memory": "4g",
        "spark.executor.cores": 1,
        "spark.executor.memoryOverhead": '2G',
        "spark.yarn.archive": "hdfs:///user/milimetric/artifacts/spark-3.3.1-assembly.zip",
        ##
        # extras to make Iceberg work on 3.3.1 and mysql jdbc driver:
        ##
        "spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.1,com.mysql:mysql-connector-j:8.2.0",
        "spark.jars.ivySettings": "/etc/maven/ivysettings.xml",  # fix jar pulling
        "spark.jars": "hdfs:///user/milimetric/customdata.jar",
    },
)

Change #1071624 merged by jenkins-bot:

[analytics/refinery/source@master] Implement custom jdbc datasource

https://gerrit.wikimedia.org/r/1071624

I thought using the new function was simple and deployment could follow after that. However the changes I had to make to it were substantial, sending it back for review.

MR looks good, sorry it took me so long to review.

Applied MR 38 via:

$ hostname -f
an-launcher1002.eqiad.wmnet

sudo -u analytics bash


spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_inconsistent_rows_rc1 REPLACE PARTITION FIELD computation_dt WITH wiki_db;
Response code
Time taken: 11.717 seconds

spark-sql (default)> ALTER TABLE wmf_dumps.wikitext_inconsistent_rows_rc1 WRITE ORDERED BY wiki_db, computation_dt, revision_timestamp;
24/10/21 16:23:10 WARN BaseTransaction: Failed to load metadata for a committed snapshot, skipping clean-up
Response code
Time taken: 6.657 seconds

Although the vast majority of wikis are successful, a couple of wikis are indeed failing. One example is chrwiktionary:

Run script:

script: venv/bin/spark-submit --driver-cores 4 --conf spark.executorEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python
  --conf spark.executorEnv.PYSPARK_PYTHON=venv/bin/python --conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf
  --conf spark.executorEnv.SPARK_HOME=venv/lib/python3.10/site-packages/pyspark
  --master yarn --conf spark.shuffle.service.name=spark_shuffle_3_3 --conf spark.shuffle.service.port=7338
  --conf spark.yarn.archive=hdfs:///user/spark/share/lib/spark-3.3.2-assembly.zip
  --conf spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.6.1
  --conf spark.jars.ivySettings=/etc/maven/ivysettings.xml --conf 'spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp/analytics/ivy_spark3/cache
  -Divy.home=/tmp/analytics/ivy_spark3/home' --conf spark.dynamicAllocation.maxExecutors=16
  --conf spark.jars=hdfs:///wmf/cache/artifacts/airflow/analytics/mysql-connector-j-8.2.0.jar,hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-spark-0.2.51.jar
  --conf spark.yarn.appMasterEnv.SPARK_HOME=venv/lib/python3.10/site-packages/pyspark
  --conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=venv/bin/python
  --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=venv/bin/python --archives
  'hdfs:///wmf/cache/artifacts/airflow/analytics/mediawiki-content-dump-0.2.0.dev0.conda.tgz#venv'
  --executor-cores 2 --executor-memory 8G --driver-memory 8G --keytab analytics.keytab
  --principal analytics/an-launcher1002.eqiad.wmnet@WIKIMEDIA --name dumps_reconcile_wikitext_raw_daily__reconcile.spark_consistency_check__20241019
  --queue production --deploy-mode client venv/bin/consistency_check.py --wiki_db
  chrwiktionary --target_table wmf_dumps.wikitext_raw_rc2 --results_table wmf_dumps.wikitext_inconsistent_rows_rc1
  --min_timestamp 2024-10-18T22:00:00 --max_timestamp 2024-10-19T22:00:00 --spark_jdbc_num_partitions
  1 --computation_dt 2024-10-20T00:00:00 --mariadb_password_file ***

Stack:

sudo -u analytics yarn logs -appOwner analytics -applicationId application_1727783536357_363857

24/10/21 19:12:04 INFO DAGScheduler: Job 0 finished: take at /var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/site-packages/mediawiki_content_dump/consistency_check.py:129, took 7.615219 s
Traceback (most recent call last):
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/filecache/10/mediawiki-content-dump-0.2.0.dev0.conda.tgz/bin/consistency_check.py", line 8, in <module>
    sys.exit(main())
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/site-packages/mediawiki_content_dump/consistency_check.py", line 400, in main
    find_inconsistent_rows(
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/site-packages/mediawiki_content_dump/consistency_check.py", line 226, in find_inconsistent_rows
    source_revisions_sql_df = df_from_mariadb_replica_adaptive(
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/site-packages/mediawiki_content_dump/consistency_check.py", line 141, in df_from_mariadb_replica_adaptive
    first_timestamp = datetime.strptime(table_lower_bound.decode('ascii'), mw_iso)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/_strptime.py", line 568, in _strptime_datetime
    tt, fraction, gmtoff_fraction = _strptime(data_string, format)
  File "/var/lib/hadoop/data/c/yarn/local/usercache/analytics/appcache/application_1727783536357_363857/container_e127_1727783536357_363857_01_000001/venv/lib/python3.10/_strptime.py", line 349, in _strptime
    raise ValueError("time data %r does not match format %r" %
ValueError: time data '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' does not match format '%Y%m%d%H%M%S'

Pausing dumps_reconcile_wikitext_raw_daily until we have time to fix T372677#10248320.

Also, I've just added an ExternalTaskSensor to dumps_reconcile_wikitext_raw_daily .

Wanted to point it out here so that we don't trip on it when we unpause this task.

ok, found the problem. My boundary query was assuming rev_timestamp was non-empty and non-null, but that's not the case:

mysql:research@dbstore1007.eqiad.wmnet [chrwiktionary]> select min(rev_timestamp), max(rev_timestamp) from revision;
+--------------------+--------------------+
| min(rev_timestamp) | max(rev_timestamp) |
+--------------------+--------------------+
|                    | 20241022133321     |
+--------------------+--------------------+
1 row in set (0.002 sec)

So what's needed is a patch to the boundary queries to add robustness here.

xcollazo changed the task status from Open to In Progress.Oct 30 2024, 1:32 PM
xcollazo claimed this task.
xcollazo moved this task from Paused to In Process on the Dumps 2.0 (Kanban Board) board.
xcollazo updated Other Assignee, added: Milimetric.

I've opened T378603: Some wikis have revision rows where rev_timestamp is blank to report root cause for this issue.

In the mean time, I have tested a solution for this issue, will link in a second.

From MR 44:

Oldest rev_timestamp on enwiki is:

MariaDB [enwiki]> select * from revision order by rev_timestamp ASC limit 1;
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
| rev_id    | rev_page | rev_comment_id | rev_actor | rev_timestamp  | rev_minor_edit | rev_deleted | rev_len | rev_parent_id | rev_sha1                        |
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
| 908493298 | 26323569 |          86852 |  12509234 | 20010115192713 |              0 |           0 |      26 |             0 | hjnc5wxv75ckwvos9wsd0as31nmnice |
+-----------+----------+----------------+-----------+----------------+----------------+-------------+---------+---------------+---------------------------------+
1 row in set (0.001 sec)

Since that was the first wiki, we should be safe to default to '20000101000000' when a rev_timestamp is corrupted in our search for a min(rev_timestamp).

xcollazo updated Other Assignee, added: xcollazo; removed: Milimetric.

(Fixed the assignment credit, as I am just doing the last mile for this.)