Page MenuHomePhabricator

druid_load_webrequest_sampled_live_hourly SerDe error in singular DAG run
Closed, ResolvedPublicBUG REPORT

Description

We've encountered a bug in the druid_load_webrequest_sampled_live_hourly DAG, which seems to have occurred for only one Airflow DAG run.

Context from an email thread follows. @Antoine_Quhen has been tracing the issue and it may have to do with an unexpected legal-but-incorrect data structure.

subject:"Airflow alert: <TaskInstance: druid_load_webrequest_sampled_live_hourly.format_webrequest_sampled_for_druid scheduled__2026-03-04T11:00:00+00:00 [failed]


Adam Baso<abaso@wikimedia.org>
Mar 4, 2026, 4:10:38 PM (14 hours ago) 
to airflow-main:, data-engineering-alerts@wikimedia.org
Okay, this one is not great. The stack trace follows below. It's suggestive of problematic JSON, maybe.

And it's the same actual stack trace one will get if one runs the SQL for the CTE SELECT statement part directly at a Spark SQL shell.

However, 

dr0ptp4kt@stat1010:~$ hdfs dfs -cat /wmf/data/raw/webrequest_sampled/webrequest_sampled/year=2026/month=03/day=04/hour=11/*.gz | zcat | jq . > /dev/null
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
dr0ptp4kt@stat1010:~$ echo $?
0

So, not sure. I'm going to try to run jq over these one by one instead of via shell expansion. I think I may need to just run the preceding task again, in hopes that there was a bit flip somewhere that escaped CRC, to make it go away. I'll try that. I've copied the files to part.*.txt in my homedir on stat1010.


Stack trace from job

an-launcher1003:~$ cat application_1764064841637_2038429.log
Container: container_e145_1764064841637_2038429_01_000001 on an-worker1204.eqiad.wmnet_8041_1772649748593
LogAggregationType: AGGREGATED
=========================================================================================================
LogType:application.driver.log
LogLastModifiedTime:Wed Mar 04 18:42:29 +0000 2026
LogLength:36813
LogContents:
Running /opt/conda-analytics/bin/spark-submit $@
SPARK_HOME: /usr/lib/spark3
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
ADD JAR file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar
26/03/04 18:41:59 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Added [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar] to class path
Added resources: [file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar]
Spark master: yarn, Application Id: application_1764064841637_2038433
Response code
Time taken: 1.422 seconds
Response code
Time taken: 0.198 seconds
26/03/04 18:42:01 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
26/03/04 18:42:14 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) (an-worker1231.eqiad.wmnet executor 1): org.apache.hadoop.hive.serde2.SerDeException: java.io.IOException: Field name expected
at org.apache.hive.hcatalog.data.JsonSerDe.deserialize(JsonSerDe.java:184)
(stack trace continues)...


...

From: Adam Baso

As expected, individual jq calls on the files also was non-error. So, perhaps the serde has problems with the file but jq does not. Hmm. The failing task, format_webrequest_sampled_for_druid , keeps failing.

Event Timeline

dr0ptp4kt renamed this task from druid_load_webrequest_sampled_live_hourly to druid_load_webrequest_sampled_live_hourly SerDe error in singular DAG run.Mar 5 2026, 12:17 PM
Antoine_Quhen claimed this task.

Data cleaned with:

import json

sc = spark.sparkContext

input_path = "hdfs://analytics-hadoop/wmf/data/raw/webrequest_sampled/webrequest_sampled/year=2026/month=03/day=04/hour=11"
output_path = "hdfs:///user/aqu/webrequest_sampled_2026030411_fixed"

def fix_line(line):
    rec = json.loads(line)
    for key, value in rec.items():
        if isinstance(value, list):
            unique = set(value)
            if len(unique) == 1:
                rec[key] = value[0]
            else:
                raise ValueError(f"Field \"{key}\" has {len(unique)} distinct values: {value}")
        elif isinstance(value, dict):
            raise ValueError(f"Field \"{key}\" has unexpected dict value: {value}")
    return json.dumps(rec, ensure_ascii=False)

rdd = sc.textFile(input_path + "/*.txt.gz")
rdd.map(fix_line).saveAsTextFile(output_path, compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")

And then I moved the directory arround.

The root cause: on some rows we had some json not respecting the expected schema. Fields like ja3n, ja4h, and res_proxy contained arrays (with the same value duplicated twice inside) in place of a str.
The way we read those data, by declaring a raw table wmf_raw.webrequest_sampled is not tolerant with those kind of errors.

One step higher in the problem is here: https://gerrit.wikimedia.org/g/operations/puppet/+/f0d57f3f75c39d96f0ea1480beacb2918d9b5f0d/modules/profile/files/benthos/instances/webrequest_live.yaml#58
Where x-analytics with multiple similar values could agglomerate into arrays.

Thanks @Vgutierrez for finding this bug. But we still don't know where the duplication comes from...

Is this cleanup process something we should implement as part of the pipeline?