Page MenuHomePhabricator

When wikis cannot be exported due to SiteInfo, don't fail them
Open, In Progress, Needs TriagePublic

Description

When doing file export, we source the list of public, active wikis by querying noc.wikimedia.org.

For brand new wikis though, we will not have the site information available on wmf_raw.mediawiki_project_namespace_map.

In these cases, we fail that particular wiki like in this example:

...
Two wikis did did fail:

minwikisource
pcmwikiquote

Both of these are very new wikis, and their SiteInfo data is still not available, so these failures are expected.

In this task we should figure what we can do so that this doesn't become a burden on OpsWeek person, as we do expect to have new wikis created on a semi-monthly basis.

An idea: we could do a SQL check on wmf_raw.mediawiki_project_namespace_map, and if we find that we do not have data for a particular wiki, we can put it in a 'skip list'. Later, we can mark that particular wiki as skipped.

Looks like we can use an AirflowSkipException top accomplish this. In a task group, the first @task should check if we should skip and if so, throw that exception.

Event Timeline

How often do we run into this case? Could this be handled by documentation / run book for the person on OpsWeek?

How often do we run into this case? Could this be handled by documentation / run book for the person on OpsWeek?

As per commit log, sometimes it happens twice in a month, sometimes it is ~3 months with no change.

We could document indeed.

I'm going to experiment with something akin to

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from test_k8s.dags.common import DPE_SRE_DAG


WIKIS = ['nonexistingwiki', 'aawiki']


with DPE_SRE_DAG(dag_id=f"sql_branch_dag", schedule=None, wip=True) as dag:

    start_task = EmptyOperator(task_id="start")
    for wiki in WIKIS:

        with TaskGroup(wiki) as group:

            @task.branch(task_id="branch_task")
            def wiki_is_mapped(ti: TaskInstance):
                xcom_value = int(ti.xcom_pull(task_ids=f"{wiki}_sql"))
                ti.log.info(xcom_value)
                if xcom_value > 0:
                    return f"continue_task_{wiki}"
                else:
                    return f"stop_task_{wiki}"

            sql_task = SQLExecuteQueryOperator(
                task_id=f"{wiki}_sql",
                sql="SELECT COUNT(*) FROM wmf_raw.mediawiki_project_namespace_map WHERE dbname='{wiki}';",
                parameters={"wiki": wiki},
                conn_id="analytics-hive",
                show_return_value_in_logs=True,
                do_xcom_push=True)

            continue_task = EmptyOperator(task_id=f"continue_task_{wiki}")
            stop_task = EmptyOperator(task_id=f"stop_task_{wiki}")

            sql_task >> wiki_is_mapped() >> [continue_task, stop_task]

        start_task >> group

If that works as expected, I'll try to integrate it into the dump DAGs.

If that works as expected, I'll try to integrate it into the dump DAGs.

Nice!

Change #1217108 had a related patch set uploaded (by Brouberol; author: Brouberol):

[operations/puppet@production] global_config: expose the hiveserver2 port for hive services

https://gerrit.wikimedia.org/r/1217108

Change #1217108 merged by Brouberol:

[operations/puppet@production] global_config: expose the hiveserver2 port for hive services

https://gerrit.wikimedia.org/r/1217108

I've gotten so far as running the following query:

from airflow.decorators import task
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.task_group import TaskGroup

from test_k8s.dags.common import DPE_SRE_DAG

WIKIS = ["nonexistingwiki", "aawiki"]


with DPE_SRE_DAG(dag_id=f"sql_branch_dag", schedule=None, wip=True) as dag:

    start_task = EmptyOperator(task_id="start")
    for wiki in WIKIS:

        with TaskGroup(wiki) as group:

            sql_task = SQLExecuteQueryOperator(
                task_id=f"{wiki}_sql",
                sql="SELECT COUNT(*) FROM wmf_raw.mediawiki_project_namespace_map WHERE language=%(wiki)s",
                parameters={"wiki": wiki},
                conn_id="analytics-hive-hiveserver2",
                show_return_value_in_logs=True,
                do_xcom_push=True,
            )

            @task.branch(task_id=f"{wiki}_branch_task")
            def wiki_is_mapped(ti: TaskInstance):
                xcom_value = int(ti.xcom_pull(task_ids=f"{wiki}_sql"))
                ti.log.info(xcom_value)
                if xcom_value > 0:
                    return f"{wiki}_continue_task"
                else:
                    return f"{wiki}_stop_task"

            continue_task = EmptyOperator(task_id=f"{wiki}_continue_task")
            stop_task = EmptyOperator(task_id=f"{wiki}_stop_task")

            sql_task >> wiki_is_mapped() >> [continue_task, stop_task]

        start_task >> group

Which fails with

FAILED: SemanticException Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.large.query to false and that hive.mapred.mode is not set to \'strict\' to proceed. Note that if you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "mediawiki_project_namespace_map" Table "mediawiki_project_namespace_map"

I don't understand how this table works, so I'm going to call this done in terms of time I could allot to it. Feel free to pick it up if you find value in the approach.

Note that this relies on the following connection:

connections:
  analytics-hive-hiveserver2:
    conn_type: hiveserver2
    host: analytics-hive.eqiad.wmnet
    port: 10000

Thanks for the effort @brouberol!

Your error means that Hive really wants to force you specify partitions in your query, In the case of this table we have:

$ spark3-sql
spark-sql (default)> show partitions wmf_raw.mediawiki_project_namespace_map;
partition
...
snapshot=2025-10
snapshot=2025-11
Time taken: 0.169 seconds, Fetched 9 row(s)

Thus the query must be specified as:

SELECT COUNT(*) FROM wmf_raw.mediawiki_project_namespace_map WHERE language=%(wiki)s AND snapshot='{{snapshot}}'

I do like the pattern you were going for here, although I would rather use spark-sql, as using Hive for anything other than the Metastore is deprecated in our infra.

Question:

Could we set a connection to a Spark Thrift Server?

Could we set a connection to a Spark Thrift Server?

I don't think we have a Spark Thrift Server running anywhere in the an-* hosts. We can possibly query Presto instead?