Page MenuHomePhabricator

Implement a spark job that converts a RDF triples table into a RDF file format
Closed, ResolvedPublic5 Estimated Story Points

Description

The table wikibase_rdf contains 4 columns (not counting partition columns):

  • context
  • subject
  • preficate
  • object

We should write a job that can converts a given partition into a format that is readable by an RDF compliant application (blazegraph must support this format). The formats used in our infracture are generally Turtle and n3 (more formats esp. faster binary ones can be evaluated but this is out of scope of this task).

The output does not have to keep the same ordering as the original RDF output from wikibase but we might consider keeping the triples attached to an entity grouped together (sort by context).
Ideally we want this format to be extracted as plain file, this task does not imply that the tooling is able to do so but some documentation must be added to define a procedure using existing hdfs tools to extract the file content.

AC:

  • a spark job is available and can take a triples table, the desired output format, (optional: the desired chunk size) the location of the output
  • documentation on how to extract the RDF chunk files out of hdfs

Event Timeline

Gehel triaged this task as High priority.Nov 3 2023, 10:42 AM
Gehel moved this task from Incoming to Current work on the Wikidata-Query-Service board.
Gehel set the point value for this task to 5.Nov 6 2023, 4:46 PM

Adding a note so I don't forget: advice from @BTullis is to avoid NFS if possible, and advice from @JAllemandou is to consider use of hdfs-rsync (after our call I sought this out and found these: https://gerrit.wikimedia.org/r/plugins/gitiles/analytics/refinery/+/refs/heads/master/python/refinery/hdfs.py and https://gerrit.wikimedia.org/g/analytics/hdfs-tools/deploy/+/2445aec92f6b3d409531fb74ab3f9a22d9716823/bin/hdfs-rsync and https://gerrit.wikimedia.org/r/plugins/gitiles/analytics/refinery/+/refs/heads/master/bin/hdfs-rsync EDIT and https://github.com/wikimedia/hdfs-tools/blob/master/src/main/scala/org/wikimedia/analytics/hdfstools/HdfsRsyncCLI.scala - the latter being available from stat boxes from a quick glance). Chances are we'd need to add a ferm and possibly where up some Kerberos stuff on the WDQS servers if going the hdfs-rsync route.

During a Meet today @EBernhardson and I with the group were discussing possible use of a mechanism similar to https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/search/shared/transfer_to_es.py?ref_type=heads#L74-83 and https://gitlab.wikimedia.org/repos/search-platform/mjolnir/-/blob/main/mjolnir/kafka/bulk_daemon.py?ref_type=heads where a file is moved to Swift via Airflow and Mjolnir client code listens for the Kafka events of the URLs from which to fetch the produced files (I haven't read this code closely yet, just parroting what I think I heard).

We'll likely need to do these data transfers more than once, so it'll be good to have some level of support of automation.

Change 980037 had a related patch set uploaded (by Dr0ptp4kt; author: Dr0ptp4kt):

[wikidata/query/rdf@master] WIP DNM: HDFS to .ttl statement generator

https://gerrit.wikimedia.org/r/980037

Not using right now, but here's roughly how one might go about generating more expanded Turtle statements without reverse-mapping prefixes: F41561068

I ran the current version of the code as follows:

spark3-submit --master yarn --driver-memory 16G --executor-memory 12G --executor-cores 4 --conf spark.driver.cores=2 --conf spark.executor.memoryOverhead=4g --conf spark.sql.shuffle.partitions=512 --conf spark.dynamicAllocation.maxExecutors=128 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.yarn.maxAppAttempts=1 --class org.wikidata.query.rdf.spark.transform.structureddata.dumps.NTripleGenerator --name wikibase-rdf-statements-spark ~dr0ptp4kt/rdf-spark-tools-0.3.138-SNAPSHOT-jar-with-dependencies.jar --input-table-partition-spec discovery.wikibase_rdf_scholarly_split/snapshot=20231016/wiki=wikidata/scope=wikidata_main --output-hdfs-path hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_main --num-partitions 1024
spark3-submit --master yarn --driver-memory 16G --executor-memory 12G --executor-cores 4 --conf spark.driver.cores=2 --conf spark.executor.memoryOverhead=4g --conf spark.sql.shuffle.partitions=512 --conf spark.dynamicAllocation.maxExecutors=128 --conf spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.yarn.maxAppAttempts=1 --class org.wikidata.query.rdf.spark.transform.structureddata.dumps.NTripleGenerator --name wikibase-rdf-statements-spark ~dr0ptp4kt/rdf-spark-tools-0.3.138-SNAPSHOT-jar-with-dependencies.jar --input-table-partition-spec discovery.wikibase_rdf_scholarly_split/snapshot=20231016/wiki=wikidata/scope=scholarly_articles --output-hdfs-path hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_schol --num-partitions 1024

And updated the permissions.

hdfs dfs -chgrp -R analytics-search-users hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_main
hdfs dfs -chgrp -R analytics-search-users hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_schol

From stat1006 it is possible to use the already present hdfs-rsync (script fronting Java utility) to copy the produced files, like this:

hdfs-rsync -r hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_schol/ file:/destination/to/nt_wd_schol_gzips/
hdfs-rsync -r hdfs://analytics-hadoop/user/dr0ptp4kt/nt_wd_main/ file:/destination/to/nd_wd_main_gzips/

Note: each directory has 1,024 files of 100 MB +/- a certain number of MB. The Spark routine randomly samples the data before sorting into partitions, and although all partitions have data, there's mild skew so the files aren't all exactly the same number of records.

@bking / @RKemper / @dcausse / I will discuss more this week.

After an update to the script (PS6) and a fresh run of the same commands new files have been hdfs-rsync'd to stat1006:~dr0ptp4kt/gzips in anticipation of doing a file transfer over to the WDQS graph split test servers.

Here's a very small sample of what the files look like:

$ zcat part-01022-c261bb68-4091-4613-ae52-88ce97d22c14-c000.txt.gz | tail -10
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "\u0935\u093F\u0915\u093F\u092E\u093F\u0921\u093F\u092F\u093E \u0936\u094D\u0930\u0947\u0923\u0940"@ne .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "\u043A\u0430\u0442\u0435\u0433\u043E\u0440\u0438\u0458\u0430 \u043D\u0430 \u0412\u0438\u043A\u0438\u043C\u0435\u0434\u0438\u0458\u0438"@sr .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "\u7DAD\u57FA\u5A92\u9AD4\u5206\u985E"@yue .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "Wikimedia-Kategorie"@de-ch .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "catigur\u00ECa di nu pruggettu Wikimedia"@scn .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "categoria di un progetto Wikimedia"@it .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/version> "1979010859"^^<http://www.w3.org/2001/XMLSchema#integer> .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "kategori Wikimedia"@map-bms .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "Wikimedia-kategoriija"@se .
<http://www.wikidata.org/entity/Q99896811> <http://schema.org/description> "\u7DAD\u57FA\u5A92\u9AD4\u5206\u985E"@zh-mo .

$ zcat part-01023-c261bb68-4091-4613-ae52-88ce97d22c14-c000.txt.gz | head -10
<http://www.wikidata.org/entity/statement/Q99896811-7623BB4C-2D20-4D2E-8784-E2ED8AD3E8E5> <http://wikiba.se/ontology#rank> <http://wikiba.se/ontology#NormalRank> .
<http://www.wikidata.org/entity/statement/Q99896811-7623BB4C-2D20-4D2E-8784-E2ED8AD3E8E5> <http://www.wikidata.org/prop/statement/P31> <http://www.wikidata.org/entity/Q4167836> .
<http://www.wikidata.org/entity/statement/Q99896811-7623BB4C-2D20-4D2E-8784-E2ED8AD3E8E5> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://wikiba.se/ontology#BestRank> .
<https://ar.wikipedia.org/wiki/%D8%AA%D8%B5%D9%86%D9%8A%D9%81:%D8%B4%D8%B1%D9%83%D8%A7%D8%AA_%D8%B3%D9%88%D9%8A%D8%B3%D8%B1%D9%8A%D8%A9_%D8%A3%D8%B3%D8%B3%D8%AA_%D9%81%D9%8A_1973> <http://schema.org/about> <http://www.wikidata.org/entity/Q99896811> .
<https://ar.wikipedia.org/wiki/%D8%AA%D8%B5%D9%86%D9%8A%D9%81:%D8%B4%D8%B1%D9%83%D8%A7%D8%AA_%D8%B3%D9%88%D9%8A%D8%B3%D8%B1%D9%8A%D8%A9_%D8%A3%D8%B3%D8%B3%D8%AA_%D9%81%D9%8A_1973> <http://schema.org/isPartOf> <https://ar.wikipedia.org/> .
<https://ar.wikipedia.org/wiki/%D8%AA%D8%B5%D9%86%D9%8A%D9%81:%D8%B4%D8%B1%D9%83%D8%A7%D8%AA_%D8%B3%D9%88%D9%8A%D8%B3%D8%B1%D9%8A%D8%A9_%D8%A3%D8%B3%D8%B3%D8%AA_%D9%81%D9%8A_1973> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Article> .
<https://ar.wikipedia.org/wiki/%D8%AA%D8%B5%D9%86%D9%8A%D9%81:%D8%B4%D8%B1%D9%83%D8%A7%D8%AA_%D8%B3%D9%88%D9%8A%D8%B3%D8%B1%D9%8A%D8%A9_%D8%A3%D8%B3%D8%B3%D8%AA_%D9%81%D9%8A_1973> <http://schema.org/inLanguage> "ar" .
<https://ar.wikipedia.org/wiki/%D8%AA%D8%B5%D9%86%D9%8A%D9%81:%D8%B4%D8%B1%D9%83%D8%A7%D8%AA_%D8%B3%D9%88%D9%8A%D8%B3%D8%B1%D9%8A%D8%A9_%D8%A3%D8%B3%D8%B3%D8%AA_%D9%81%D9%8A_1973> <http://schema.org/name> "\u062A\u0635\u0646\u064A\u0641:\u0634\u0631\u0643\u0627\u062A \u0633\u0648\u064A\u0633\u0631\u064A\u0629 \u0623\u0633\u0633\u062A \u0641\u064A 1973"@ar .
<https://en.wikipedia.org/wiki/Category:Swiss_companies_established_in_1973> <http://schema.org/inLanguage> "en" .
<https://en.wikipedia.org/wiki/Category:Swiss_companies_established_in_1973> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://schema.org/Article> .

You'll notice that the the files are partitioned by context and subject, and within a partition they're also sorted by context and subject (the context field isn't part of the output, though; one would get that from the source tables). So you may see, as in this example, things that are logically clustered together spanning from the end of one file and the beginning of the next partition in sequence.

Change 980914 had a related patch set uploaded (by Ryan Kemper; author: Ryan Kemper):

[operations/puppet@production] wdqs: open firewall rules for graph_split

https://gerrit.wikimedia.org/r/980914

Change 980914 merged by Ryan Kemper:

[operations/puppet@production] wdqs: open firewall rules for graph_split

https://gerrit.wikimedia.org/r/980914

I started a transfer from of the gzip files mentioned above to wdqs1023 from wdqs1024 (wdqs hosts have 10Gbps Ethernet vs. 1Gps for the stat machines, so this should be faster).

You can set a temporary iptables rule to allow traffic between hosts on an arbitrary port:

On destination: iptables -I INPUT 1 -p tcp --dport ${PORT} -s ${SENDER_IP} -j ACCEPT

Remember to disable Puppet first, and re-enable it afterwards!

Mentioned in SAL (#wikimedia-operations) [2023-12-07T19:35:24Z] <ryankemper@cumin1001> START - Cookbook sre.hosts.downtime for 7 days, 0:00:00 on wdqs[1022-1024].eqiad.wmnet with reason: graph split experiments T350106

Mentioned in SAL (#wikimedia-operations) [2023-12-07T19:35:40Z] <ryankemper@cumin1001> END (PASS) - Cookbook sre.hosts.downtime (exit_code=0) for 7 days, 0:00:00 on wdqs[1022-1024].eqiad.wmnet with reason: graph split experiments T350106

Here's some extra notes with some of the commands we ran/used: P54284

We want to add some more tests before closing this task.

Imports seemed to work.

Non-scholarly article side (proxied to wdqs1024.eqiad.wmnet)

split-non-schol-side.gif (480×640 px, 641 KB)

Scholarly article side (proxied to wdqs1023.eqiad.wmnet)

split-schol-side.gif (480×640 px, 504 KB)

Next steps:

  • Add automated unit test(s) to the patch.
  • Add doc / pointer to Pastes somewhere handy

Also, non-blocking for this here task, but mentioning here for findability - the queries in T349512: [Analytics] Collect multiple sets of SPARQL queries will provide the fuller view on query coverage and their runtime characteristics.

Change 980037 merged by jenkins-bot:

[wikidata/query/rdf@master] HDFS to .ttl statement generator

https://gerrit.wikimedia.org/r/980037

Change #1038328 had a related patch set uploaded (by Btullis; author: Btullis):

[operations/puppet@production] Remove temporary firewall rule for WDQS graph_split

https://gerrit.wikimedia.org/r/1038328

Change #1038328 merged by Bking:

[operations/puppet@production] Remove temporary firewall rule for WDQS graph_split

https://gerrit.wikimedia.org/r/1038328