Page MenuHomePhabricator

Implement an Airflow job that runs and publishes the XML dumps
Closed, ResolvedPublic8 Estimated Story Points

Description

The task T335862: Implement job to generate Dump XML files will have as output a Spark job.

In this task we should implement an Airflow job that:

  • Waits on upstream table wmf_dumps.wikitext_raw_rc2 to be ready (need to figure out what ready means?) TBD later.
  • Runs the Spark job.
  • Runs the offline script that takes the spark output files and renames them. (Job should generate one file per partition, thus there should be a 1:1 name correspondece between partition folder and the one partition file inside it). This is done as part of the Spark job itself.
  • Publishes the files. (for now, publishing should be internal, so maybe just a mv to a well known location in HDFS?)

Event Timeline

xcollazo renamed this task from Implement an Airflow job to runs and publishes the XML dumps to Implement an Airflow job that runs and publishes the XML dumps.Sep 15 2023, 5:38 PM
xcollazo set the point value for this task to 8.Sep 19 2023, 1:43 PM
mforns updated Other Assignee, removed: JEbe-WMF.

Change 990631 had a related patch set uploaded (by Jennifer Ebe; author: Jennifer Ebe):

[analytics/refinery/source@master] Fix scala job that publishes the XML dumps

https://gerrit.wikimedia.org/r/990631

Change 991795 had a related patch set uploaded (by Xcollazo; author: Xcollazo):

[analytics/refinery/source@master] Fix code serialization for MediawikiDumper.scala job.

https://gerrit.wikimedia.org/r/991795

xcollazo updated Other Assignee, added: JEbe-WMF.

Here is what I did, step by step, to get the Spark job to run successfully:

First, I tried the spark3-submit command as it was being given by the DAG, but I had to modify the following:

  • the jar since refinery-job-0.2.23-shaded.jar never got published. Bumped it to 34.
  • the --output_folder so that we don't inadvertenly try to write to a production folder.
  • the --name flag so that it is clear what and whom is running this test run

That leaves us with:

spark3-submit \
--driver-cores 4 \
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3 \
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=16 \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.memoryOverhead=2G \
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 \
--executor-cores 2 \
--executor-memory 4G \
--driver-memory 16G \
--name xcollazo_manual_dumps_publish_wikitext_raw_to_xml__publish_simplewiki_to_xml__20240111 \
--class org.wikimedia.analytics.refinery.job.mediawikidumper.MediawikiDumper \
--queue default \
--deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.24-shaded.jar \
--wiki_id simplewiki \
--snapshot 2023-12-01 \
--source_table wmf_dumps.wikitext_raw_rc2 \
--output_folder /user/xcollazo/content_dump_test

After fixing those things I got:

Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Error: Unknown option --snapshot
Error: Unknown argument '2023-12-01'
Error: Missing option --publish_until
Try --help for more information.

That --snapshot parameter got renamed to --publish_until. So let's try that:

spark3-submit \
--driver-cores 4 \
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3 \
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=16 \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.memoryOverhead=2G \
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 \
--executor-cores 2 \
--executor-memory 4G \
--driver-memory 16G \
--name xcollazo_manual_dumps_publish_wikitext_raw_to_xml__publish_simplewiki_to_xml__20240111 \
--class org.wikimedia.analytics.refinery.job.mediawikidumper.MediawikiDumper \
--queue default \
--deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.24-shaded.jar \
--wiki_id simplewiki \
--publish_until 2023-12-01 \
--source_table wmf_dumps.wikitext_raw_rc2 \
--output_folder /user/xcollazo/content_dump_test

This gives a stack trace:

24/01/18 21:03:44 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
...
Caused by: java.io.NotSerializableException: org.apache.log4j.Logger
Serialization stack:
	- object not serializable (class: org.apache.log4j.Logger, value: org.apache.log4j.Logger@7d0e5fbb)
	- field (class: scala.Some, name: value, type: class java.lang.Object)
	- object (class scala.Some, Some(org.apache.log4j.Logger@7d0e5fbb))
	- field (class: org.wikimedia.analytics.refinery.job.mediawikidumper.PagesPartitionsDefiner, name: log, type: class scala.Option)
	- object (class org.wikimedia.analytics.refinery.job.mediawikidumper.PagesPartitionsDefiner, org.wikimedia.analytics.refinery.job.mediawikidumper.PagesPartitionsDefiner@472cf6af)
...

There is a great explanation about this stack over at StackOverflow. ( https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou )

TLDR is that Spark requires all RDD code to be serializable and, for some reason, the log object on PagesPartitionsDefiner was not fixed before so that it doesn't except here.

I could repro this issue with the existing unit tests, so I played with the logging mechanism a bit and figured that we do not need to send in the log object as a parameter since the class already extends org.apache.spark.internal.Logging, which is Serializable. So I changed the code to utilize that instead. So it feels like this issue was found before but was not addressed fully. Code changes can be seen on https://gerrit.wikimedia.org/r/c/analytics/refinery/source/+/991795.

We now rerun the code with the fixed jar:

spark3-submit \
--driver-cores 4 \
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3 \
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=16 \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.memoryOverhead=2G \
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 \
--executor-cores 2 \
--executor-memory 4G \
--driver-memory 16G \
--name xcollazo_manual_dumps_publish_wikitext_raw_to_xml__publish_simplewiki_to_xml__20240111 \
--class org.wikimedia.analytics.refinery.job.mediawikidumper.MediawikiDumper \
--queue default \
--deploy-mode client /home/xcollazo/wmf/gerrit/source/refinery-job/target/refinery-job-0.2.29-SNAPSHOT-shaded.jar \
--wiki_id simplewiki \
--publish_until 2023-12-01 \
--source_table wmf_dumps.wikitext_raw_rc2 \
--output_folder /user/xcollazo/content_dump_test

This time the job almost made it to the end, but a couple OOM failures in some tasks made it fail. So we bump the memory from 4G to 8G. We also bump maxExecutors from 16 to 32 to speed it up:

spark3-submit \
--driver-cores 4 \
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3 \
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--master yarn \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.maxExecutors=32 \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.memoryOverhead=2G \
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3 \
--executor-cores 2 \
--executor-memory 8G \
--driver-memory 16G \
--name xcollazo_manual_dumps_publish_wikitext_raw_to_xml__publish_simplewiki_to_xml__20240111 \
--class org.wikimedia.analytics.refinery.job.mediawikidumper.MediawikiDumper \
--queue default \
--deploy-mode client /home/xcollazo/wmf/gerrit/source/refinery-job/target/refinery-job-0.2.29-SNAPSHOT-shaded.jar \
--wiki_id simplewiki \
--publish_until 2023-12-01 \
--source_table wmf_dumps.wikitext_raw_rc2 \
--output_folder /user/xcollazo/content_dump_test

Success! The output needs tuning as we got ~15k files each about ~100Kb size, but we can tune that later. The important thing from the point of view of this task is that we now have a well behaving job and thus we can focus on changes to the DAG (which we will do next).

Change 990631 abandoned by Xcollazo:

[analytics/refinery/source@master] Fix scala job that publishes the XML dumps

Reason:

Superseeded by If286cc8b74a54049e448aa918c7354a5bf29722f

https://gerrit.wikimedia.org/r/990631

Change 991795 merged by jenkins-bot:

[analytics/refinery/source@master] Fix code serialization for MediawikiDumper.scala job.

https://gerrit.wikimedia.org/r/991795

All right, now let's fix the Airflow DAG.

As per T346278#9220353, @Milimetric had left a draft implementation in a git branch.

We want to dust that code off and rebase it, and if we can, we want to leave a commit attributable to Dan.

git checkout main
git pull
git checkout publish-dumps-to-xml
git rebase main

Conflicts!:

CONFLICT (content): Merge conflict in analytics/config/artifacts.yaml
error: could not apply 9c3ea9f9... Separate refinery-job from refinery-hive artifacts

Looks like Dan had tried to put some order in our very ugly artifacts.yaml file. He added those changes as a separate commit, and that is always a great idea as that is a separate unit of work. Since this branch is so far behind, it is easier to redo that ordering / cleanup in a separate ticket, so I will just cancel this rebase, remove that commit from history, and move on.

Since I will be modifying the history of this branch, to be nice, let's create a separate branch:

git rebase --abort
git checkout -b publish-dumps-to-xml-take-2
git rebase -i HEAD~2

In a text editor, we change it so that it is like so:

drop 9c3ea9f9 Separate refinery-job from refinery-hive artifacts
pick 7fdac9eb Add job to publish content dumps as XML

So we drop the problematic reordering commit. Note again this is only possible since these were separate, well defined commits (thanks for being tidy @Milimetric!).

We still get conflicts on artifacts.yaml but they are auto-solvable.

Now let's move this to the future:

airflow-dags % git rebase main
Successfully rebased and updated refs/heads/publish-dumps-to-xml-take-2.

Ok, now we are in a place were we still have Dan's main attributed commit, we're rebased to today, and we are ready to fix the DAG.

Made a couple of changes, largely cosmetic and integrating what we learnt in T346278#9473132. The changes can be seen at https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/587.

Running the fixture tests spits out the following, which looks good to me:

spark3-submit
--driver-cores 4
--conf spark.executorEnv.SPARK_HOME=/usr/lib/spark3
--conf spark.executorEnv.SPARK_CONF_DIR=/etc/spark3/conf
--master yarn
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=32
--conf spark.shuffle.service.enabled=true
--conf spark.yarn.maxAppAttempts=1
--conf spark.executor.memoryOverhead=2G
--conf spark.yarn.appMasterEnv.SPARK_CONF_DIR=/etc/spark3/conf
--conf spark.yarn.appMasterEnv.SPARK_HOME=/usr/lib/spark3
--executor-cores 2
--executor-memory 8G
--driver-memory 16G
--keytab airflow.keytab
--principal airflow
--name dumps_publish_wikitext_raw_to_xml__publish_simplewiki_to_xml__20240201
--class org.wikimedia.analytics.refinery.job.mediawikidumper.MediawikiDumper
--queue production
--deploy-mode client hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.29-shaded.jar
--wiki_id simplewiki
--publish_until 2024-01-01
--source_table wmf_dumps.wikitext_raw_rc2
--output_folder /wmf/data/archive/content_dump_test/simplewiki/20240101/

But, let's test it in the Airflow Development Environment to be sure.

The way I typically do this is that I push my branch to Gitlab from my laptop, and then pull it from the stat box. Not neccesary to do it like that, but that works for me.

So fro my mac:

git commit
git push

and then from stat box:

git pull
git checkout -b publish-dumps-to-xml-take-2 origin/publish-dumps-to-xml-take-2

Now we want to run the dev Airflow environment. First we nuke any previous stuff from the airflow temp folder to have a clean slate:

sudo -u analytics-privatedata rm -rf /tmp/xcollazo_airflow_home/
sudo -u analytics-privatedata mkdir /tmp/xcollazo_airflow_home/

Now we run airflow against the analytics code:

sudo -u analytics-privatedata ./run_dev_instance.sh -p 8321 -m /tmp/xcollazo_airflow_home analytics

Now that we have the DAG up, let's modify its DagProperties so that we can test it. Originally we had:

{
  "start_date": "2023-12-01T00:00:00",
  "sla": "P10D",
  "refinery_job_jar": "hdfs:///wmf/cache/artifacts/airflow/analytics/refinery-job-0.2.29-shaded.jar",
  "hive_wikitext_raw_table": "wmf_dumps.wikitext_raw_rc2",
  "output_hdfs_root_path": "/wmf/data/archive/content_dump_test",
  "driver_memory": "16G",
  "driver_cores": "4",
  "executor_memory": "8G",
  "executor_cores": "2",
  "num_executors": "32",
  "spark_executor_memoryOverhead": "2G"
}

We need to make the jar file available to our test env:

hdfs dfs -mkdir /user/xcollazo/artifacts
hdfs dfs -copyFromLocal refinery-job-0.2.29-SNAPSHOT-shaded.jar /user/xcollazo/artifacts
hdfs dfs -chmod +rx /user/xcollazo/artifacts/refinery-job-0.2.29-SNAPSHOT-shaded.jar
hdfs dfs -ls /user/xcollazo/artifacts

We need an HDFS folder were the Spkar job, which runs as analytics-privatedata, can write freely:

hdfs dfs -mkdir /user/xcollazo/content_dump_test_dag_run
hdfs dfs -chmod +rwx /user/xcollazo/content_dump_test_dag_run

Now fix the DagProperties:

{
  "start_date": "2023-12-01T00:00:00",
  "sla": "P10D",
  "refinery_job_jar": "hdfs:///user/xcollazo/artifacts/refinery-job-0.2.29-SNAPSHOT-shaded.jar",
  "hive_wikitext_raw_table": "wmf_dumps.wikitext_raw_rc2",
  "output_hdfs_root_path": "/user/xcollazo/content_dump_test_dag_run/",
  "driver_memory": "16G",
  "driver_cores": "4",
  "executor_memory": "8G",
  "executor_cores": "2",
  "num_executors": "32",
  "spark_executor_memoryOverhead": "2G"
}

And now we run the job manually in the Airflow UI.

Success! The job ran fine, with similar output as before (~15k files, ~100kb each [which is something we need to tune, but later!]):

sudo -u analytics-privatedata hdfs dfs -count /user/xcollazo/content_dump_test_dag_run/simplewiki/20231201
           1        15945         3042304509 /user/xcollazo/content_dump_test_dag_run/simplewiki/20231201

There are many other things we want do to in this DAG to call it done:

  • We want to sense when it is the right time to launch the dumps. We do not have T340463 yet, so we will ignore that for now.
  • We want to have a list of wikis of many sizes and types, to exercise the dump code. But we need to tune simplewiki first, and that is out of scope of this task. Thus we will do that separately.

So for now, we good, and we can call this task done (after we merge the MR!).