Page MenuHomePhabricator

Optimization tips and feedback
Closed, ResolvedPublic

Description

Hello,
I will appreciate any tips and feedback that you can provide.

It looks like both of these spark queries could use some optimization. As of Friday, both queries are running slower than they were running when I tried them last. The translation query I tried one week ago and it ran smoothly. The pageview query I last ran in November and it worked fine.

In the first query, I would like to know which articles from a list of articles were translated.
In the second query, I would like to know, for a list of articles, the sum of pageviews per article between Jan 11th and Feb 11th.

from pyspark.sql.types import ArrayType, StringType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

att_r = '''
SELECT
    wiki_db AS wikicode,
    page_id, 
    revision_tags
FROM wmf.mediawiki_history
WHERE
    snapshot = "{MWH_SNAPSHOT}"
    AND page_namespace = 0
    AND array_contains(revision_tags, "contenttranslation")   
    AND wiki_db = 'pawiki' 
    AND page_id IN {clean_pageids}
GROUP BY 
    wiki_db, page_id, revision_tags
'''


pv_since_contest_start_r = '''
SELECT 
   page_id,
   SUM(view_count) AS views_1M
FROM wmf.pageview_hourly 
WHERE 
  year = 2020
  AND CONCAT(month,day) >= 0111 --AND (month >= 01 AND day >=11)
  AND CONCAT(month,day) < 0211  --AND (month <= 02 AND day <=11)
  AND agent_type = 'user'
  AND country_code = 'IN'
  AND project = 'pa.wikipedia'
  AND page_id IN {clean_pageids}
GROUP BY page_id, view_count
'''


pv_since_contest_start = spark.sql(pv_since_contest_start_r.format(**quality_vars)).toPandas()
att = spark.sql(att_r.format(**quality_vars)).toPandas()

Event Timeline

@JAllemandou might know best but i think the partition predicate might not doing what you think.

Partitions are better specified like year, month, day, like

(year = 2019 and month=1 and day in (11, 12...31) ) or (year=2019 and month=2 and day in (11, 12..28)

otherwise this query might be scanning (many) more partitions that you want trying possible concats

Hi @Iflorez,

About the pageview query, @Nuria is right: partition predicates need to use single partition fields and simple definition (the same as what you do in comments) Otherwise the engine doesn't manage to get which partitions to read or not, and therefore reads all, and then filter based on their values.

About the mediawiki_history query: You say you are after pages, but what you get here are revisions. You could have multiple rows for the same page_id for instance. Also, mediawiki_history containing page, revision and user data, it would be better to force filter for revision event only:AND event_entity = 'revision', even if non-null revision_tags should only be present for revisions. Something else: you don't filter out deleted pages (or more precisely revision_is_delete_by_page_deletion) - Is it on purpose?

Finally on performance of the queries, the time they take to complete is very much dependent of how many jobs concurrently run on the cluster. If the cluster is busy, less resource can be allocated to your queries, and they take longer even if there is no optimization to be made :)

Thank you for the feedback!
I've updated the date handling in the pageviews query and added event_entity, revision_is_identity_reverted, and revision_is_deleted_by_page_deletion to the fields used in the revision tags mediawiki_history table query.
These are performing much better now.

pageviews_r = '''
SELECT 
   page_id,
   SUM(view_count) AS views_1M
FROM wmf.pageview_hourly 
WHERE 
  year = 2020
  AND (month >= {contest_end_dt_month} AND day >= {contest_end_dt_day}) 
  AND (month <= {contest_end_dt_1M_month} AND day <= {contest_end_dt_1M_day}) 
  AND agent_type = 'user'
  AND country_code = '{country_code}'
  AND project = '{project}'
  AND page_id IN {clean_pageids}
GROUP BY page_id, view_count
'''

att_r = '''
SELECT
    wiki_db AS wikicode,
    page_id, 
    revision_tags
FROM wmf.mediawiki_history
WHERE
    snapshot = "{MWH_SNAPSHOT}"
    AND event_timestamp >="{contest_start}"
    AND event_timestamp <"{contest_end}"
    AND page_namespace = 0
    AND event_entity = 'revision'
    AND revision_is_identity_reverted = False 
    AND revision_is_deleted_by_page_deletion = False
    AND array_contains(revision_tags, "contenttranslation")   
    AND wiki_db = '{wiki_db}' 
    AND page_id IN {clean_pageids}
GROUP BY 
    wiki_db, page_id, revision_tags
'''

@JAllemandou I tried running these spark queries over the weekend on a small batch of articles and they timed out.
Might you have tips or insights? I didn't receive any error messages, simply the queries took a very long time and eventually I stopped the kernel.
Given that behavior, I also tried running the queries as hive queries and had similar issues.

Hi @Iflorez - This kinda feels like Kerberos. Can you confirm you have run kinit and entrered your password in a notebook-terminal (see https://wikitech.wikimedia.org/wiki/SWAP#Kerberos).

Thank you. Yes, I can confirm that I had run kinit and entered my kerberos credentials in a notebook-terminal.

In an effort to run these queries from a Python3 notebook without needing to change the notebook type, I've switched these queries to run as spark queries using the wmf data package's spark.run function. I'm now able to run the queries. For example, here's the code for the translation query:

import wmfdata as wmf
from wmfdata import charting, mariadb, hive, spark
from wmfdata.utils import pct_str, pd_display_all
import pandas as pd

at_edits = spark.run("""
SELECT
    page_id, 
    revision_tags AS at_edits
FROM wmf.mediawiki_history
WHERE
    snapshot = "{MWH_SNAPSHOT}"
    AND event_timestamp >="{contest_start}"
    AND event_timestamp <"{contest_end}"
    AND page_namespace = 0
    AND event_entity = 'revision'
    AND revision_is_identity_reverted = False 
    AND revision_is_deleted_by_page_deletion = False
    AND array_contains(revision_tags, "contenttranslation")   
    AND wiki_db = '{wiki_db}' 
    AND page_id IN {clean_pageids}
GROUP BY 
    page_id, revision_tags
""".format(**quality_vars))