Whether we use LocalExecutor or KubernetesExecutor, we should be able to start a Spark job on YARN via Skein in our kubernetes airflow instances. We need to write a DAG that relies on our SkeinOperator that launches a job using the SkeinOperator and reads/writes to HDFS and/or Hive.
Description
Details
| Title | Reference | Author | Source Branch | Dest Branch | |
|---|---|---|---|---|---|
| test_k8s: introduce a DAG that runs an idempotent HDFS command via Skein | repos/data-engineering/airflow-dags!887 | brouberol | T377602 | main |
| Status | Subtype | Assigned | Task | ||
|---|---|---|---|---|---|
| Resolved | brouberol | T362788 Migrate Airflow to the dse-k8s cluster | |||
| Resolved | brouberol | T364389 Migrate the airflow scheduler components to Kubernetes | |||
| Resolved | brouberol | T377928 Ensure that we can submit spark jobs via `spark3-submit` from airflow | |||
| Resolved | brouberol | T377602 Validate that we can submit Spark jobs with Skein in Kubernetes |
Event Timeline
@BTullis could you enquire as to what would be a good "hello world" Spark job to submit via Skein?Thanks!
Piecing this together myself, I think that we have a couple of simple tasks that we can look at testing.
Here is a spark-sql based query:
task_name = SparkSqlOperator(
task_id="list_canonical_countries",
sql="SELECT name FROM canonical_data.countries",
)This is based on my understanding of Data_Platform/Systems/Airflow/Developer_guide#SparkSQLOperator and the sample query from the wmfdata-python quickstart notebook.
I also read the code for the operator a little here.
In almost all cases, WMF prefers to keep the SQL and HQL (hive query language) in a from the code that schedules it, so we don't have many examples of where the SQL is executed directly.
There are a couple of examples, though. Like here:
hive_cleaner = SparkSqlOperator(
task_id="remove_temporary_table",
sql=f"DROP TABLE IF EXISTS {props.intermediate_database}.{intermediate_table};",
**props.minimalist_spark_config,
)Also, when I was looking at that file, I saw a SimpleSkeinOperator task that looked very much like your first test skein task.
hdfs_cleaner = SimpleSkeinOperator(
task_id="remove_temporary_directory",
script=f"hdfs dfs -rm -r {intermediate_directory}",
)So maybe it would work. Maybe these hdfs dfs commands are executed on a hadoop worker, after all.
I've talked to @JAllemandou and Skein support in Kubernetes might not be required. We might want to refactor the SimpleSkeinOperator so that it detects that the task is running in Kubernetes and "redirect" to an other Operator.
That being said, once we support shelling out to spark3-submit in the airflow task pod (cf T377928), we're about 95% there anyway.
Skein support in Kubernetes might not be required
Indeed!
The only reason we use skein is to keep tasks from being executed on the scheduler pod. It is possible that Research Engineering has some other primary uses. Let's ask @fkaelin.
once we support shelling out to spark3-submit in the airflow task pod
Hm. For SparkSubmitOperator, perhaps it would it be better to just change the tasks to set launcher=local?
SimpleSkeinOperator so that it detects that the task is running in Kubernetes and "redirect" to an other Operator
Hm, this seems a little strange. I understand that it would be helpful to make the migration to k8s more seemless, but having SimpleSkeinOperator not do Skein seems weird. Perhaps it would be possible to migrate usages to something lke KubernetesPodOperator instead?
Agreed that the SimpleSkeinOperator on kubernetes airflow is not needed anymore (and that the code should use kubernetes native operator instead).
Currently SimpleSkeinOperator is the only usage (besides the spark one of course), but I would suggest to keep the base SkeinOperator while yarn is available and the main source of compute at DE, e.g. to run ml inference tools such as this one or to deploy existing skein apps like this vector db
airflow@airflow-scheduler-59874c6987-n52kr:/opt/airflow$ spark3-submit
Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]
Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
...spark3-submit can be run as expected. Now, let's try to make it work.
For reference, these are the logs generated by a spark3-submit command on an-launcher1002.eqiad.wmnet:
brouberol@an-launcher1002:~$ spark3-submit --master yarn --deploy-mode client /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py 10
Running /opt/conda-analytics/bin/spark-submit $@
SPARK_HOME: /opt/conda-analytics/lib/python3.10/site-packages/pyspark
Using Hadoop client lib jars at 3.2.0, provided by Spark.
PYSPARK_PYTHON=/opt/conda-analytics/bin/python3
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
24/11/05 15:00:10 INFO SparkContext: Running Spark version 3.1.2
24/11/05 15:00:11 INFO ResourceUtils: ==============================================================
24/11/05 15:00:11 INFO ResourceUtils: No custom resources configured for spark.driver.
24/11/05 15:00:11 INFO ResourceUtils: ==============================================================
24/11/05 15:00:11 INFO SparkContext: Submitted application: PythonPi
24/11/05 15:00:11 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
24/11/05 15:00:11 INFO ResourceProfile: Limiting resource is cpus at 1 tasks per executor
24/11/05 15:00:11 INFO ResourceProfileManager: Added ResourceProfile id: 0
24/11/05 15:00:11 INFO SecurityManager: Changing view acls to: brouberol
24/11/05 15:00:11 INFO SecurityManager: Changing modify acls to: brouberol
24/11/05 15:00:11 INFO SecurityManager: Changing view acls groups to:
24/11/05 15:00:11 INFO SecurityManager: Changing modify acls groups to:
24/11/05 15:00:11 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(brouberol); groups with view permissions: Set(); users with modify permissions: Set(brouberol); groups with modify permissions: Set()
24/11/05 15:00:11 WARN Utils: Service 'sparkDriver' could not bind on port 12000. Attempting port 12001.
24/11/05 15:00:11 WARN Utils: Service 'sparkDriver' could not bind on port 12001. Attempting port 12002.
24/11/05 15:00:11 INFO Utils: Successfully started service 'sparkDriver' on port 12002.
24/11/05 15:00:11 INFO SparkEnv: Registering MapOutputTracker
24/11/05 15:00:11 INFO SparkEnv: Registering BlockManagerMaster
24/11/05 15:00:11 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
24/11/05 15:00:11 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
24/11/05 15:00:11 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
24/11/05 15:00:11 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7546b156-ae90-4dd3-9cf9-ada59c493244
24/11/05 15:00:11 INFO MemoryStore: MemoryStore started with capacity 398.7 MiB
24/11/05 15:00:11 INFO SparkEnv: Registering OutputCommitCoordinator
24/11/05 15:00:11 INFO log: Logging initialized @6369ms to org.sparkproject.jetty.util.log.Slf4jLog
24/11/05 15:00:11 INFO Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_412-8u412-ga-1~deb11u1-b08
24/11/05 15:00:11 INFO Server: Started @6499ms
24/11/05 15:00:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/11/05 15:00:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/11/05 15:00:12 INFO AbstractConnector: Started ServerConnector@43847cda{HTTP/1.1, (http/1.1)}{0.0.0.0:4042}
24/11/05 15:00:12 INFO Utils: Successfully started service 'SparkUI' on port 4042.
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4bde9bd0{/jobs,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7bceea7e{/jobs/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@604f811d{/jobs/job,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@71180812{/jobs/job/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@15f27870{/stages,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5cb6ffca{/stages/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2e74a989{/stages/stage,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@69e06741{/stages/stage/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@1db6aad5{/stages/pool,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@8b96a55{/stages/pool/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@17562cd1{/storage,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@784213f8{/storage/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@174caeaa{/storage/rdd,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@15118263{/storage/rdd/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@787f8c4{/environment,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@477e330e{/environment/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6b5694c8{/executors,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2e91a334{/executors/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5a871f71{/executors/threadDump,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6956f125{/executors/threadDump/json,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4ffa9f66{/static,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@36b6746f{/,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@24245a4b{/api,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41237dbd{/jobs/job/kill,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@51dd7187{/stages/stage/kill,null,AVAILABLE,@Spark}
24/11/05 15:00:12 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://an-launcher1002.eqiad.wmnet:4042
24/11/05 15:00:12 INFO HadoopDelegationTokenManager: Attempting to load user's ticket cache.
24/11/05 15:00:13 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_923320786_30, ugi=brouberol@WIKIMEDIA (auth:KERBEROS)]] with renewer yarn/an-master1003.eqiad.wmnet@WIKIMEDIA
24/11/05 15:00:13 INFO DFSClient: Created token for brouberol: HDFS_DELEGATION_TOKEN owner=brouberol@WIKIMEDIA, renewer=yarn, realUser=, issueDate=1730818813506, maxDate=1731423613506, sequenceNumber=26993249, masterKeyId=2095 on ha-hdfs:analytics-hadoop
24/11/05 15:00:13 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_923320786_30, ugi=brouberol@WIKIMEDIA (auth:KERBEROS)]] with renewer brouberol@WIKIMEDIA
24/11/05 15:00:13 INFO DFSClient: Created token for brouberol: HDFS_DELEGATION_TOKEN owner=brouberol@WIKIMEDIA, renewer=brouberol, realUser=, issueDate=1730818813548, maxDate=1731423613548, sequenceNumber=26993250, masterKeyId=2095 on ha-hdfs:analytics-hadoop
24/11/05 15:00:13 INFO HadoopFSDelegationTokenProvider: Renewal interval is 86400055 for token HDFS_DELEGATION_TOKEN
24/11/05 15:00:15 INFO HiveConf: Found configuration file file:/etc/spark3/conf/hive-site.xml
24/11/05 15:00:15 INFO SparkHadoopUtil: Updating delegation tokens for current user.
24/11/05 15:00:15 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
24/11/05 15:00:16 INFO Client: Requesting a new application from cluster with 107 NodeManagers
24/11/05 15:00:16 INFO Configuration: resource-types.xml not found
24/11/05 15:00:16 INFO ResourceUtils: Unable to find 'resource-types.xml'.
24/11/05 15:00:16 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (49152 MB per container)
24/11/05 15:00:16 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
24/11/05 15:00:16 INFO Client: Setting up container launch context for our AM
24/11/05 15:00:16 INFO Client: Setting up the launch environment for our AM container
24/11/05 15:00:16 INFO Client: Preparing resources for our AM container
24/11/05 15:00:16 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar
24/11/05 15:00:16 INFO Client: Uploading resource file:/opt/conda-analytics/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/brouberol/.sparkStaging/application_1727783536357_619602/pyspark.zip
24/11/05 15:00:16 INFO Client: Uploading resource file:/opt/conda-analytics/lib/python3.10/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip -> hdfs://analytics-hadoop/user/brouberol/.sparkStaging/application_1727783536357_619602/py4j-0.10.9-src.zip
24/11/05 15:00:16 INFO Client: Uploading resource file:/tmp/spark-c78d3d84-03b1-42e4-b45f-32c08cffda07/__spark_conf__8019417001343397592.zip -> hdfs://analytics-hadoop/user/brouberol/.sparkStaging/application_1727783536357_619602/__spark_conf__.zip
24/11/05 15:00:17 INFO SecurityManager: Changing view acls to: brouberol
24/11/05 15:00:17 INFO SecurityManager: Changing modify acls to: brouberol
24/11/05 15:00:17 INFO SecurityManager: Changing view acls groups to:
24/11/05 15:00:17 INFO SecurityManager: Changing modify acls groups to:
24/11/05 15:00:17 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(brouberol); groups with view permissions: Set(); users with modify permissions: Set(brouberol); groups with modify permissions: Set()
24/11/05 15:00:17 INFO Client: Submitting application application_1727783536357_619602 to ResourceManager
24/11/05 15:00:17 INFO YarnClientImpl: Submitted application application_1727783536357_619602
24/11/05 15:00:18 INFO Client: Application report for application_1727783536357_619602 (state: ACCEPTED)
24/11/05 15:00:18 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1730818817157
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_619602/
user: brouberol
24/11/05 15:00:19 INFO Client: Application report for application_1727783536357_619602 (state: ACCEPTED)
24/11/05 15:00:20 INFO Client: Application report for application_1727783536357_619602 (state: ACCEPTED)
24/11/05 15:00:21 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> an-master1003.eqiad.wmnet,an-master1004.eqiad.wmnet, PROXY_URI_BASES -> http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_619602,http://an-master1004.eqiad.wmnet:8088/proxy/application_1727783536357_619602, RM_HA_URLS -> an-master1003.eqiad.wmnet:8088,an-master1004.eqiad.wmnet:8088), /proxy/application_1727783536357_619602
24/11/05 15:00:21 INFO Client: Application report for application_1727783536357_619602 (state: RUNNING)
24/11/05 15:00:21 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: N/A
ApplicationMaster host: 10.64.53.42
ApplicationMaster RPC port: -1
queue: default
start time: 1730818817157
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_619602/
user: brouberol
24/11/05 15:00:21 INFO YarnClientSchedulerBackend: Application application_1727783536357_619602 has started running.
24/11/05 15:00:21 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13000. Attempting port 13001.
24/11/05 15:00:21 WARN Utils: Service 'org.apache.spark.network.netty.NettyBlockTransferService' could not bind on port 13001. Attempting port 13002.
24/11/05 15:00:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 13002.
24/11/05 15:00:21 INFO NettyBlockTransferService: Server created on an-launcher1002.eqiad.wmnet:13002
24/11/05 15:00:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
24/11/05 15:00:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, an-launcher1002.eqiad.wmnet, 13002, None)
24/11/05 15:00:21 INFO BlockManagerMasterEndpoint: Registering block manager an-launcher1002.eqiad.wmnet:13002 with 398.7 MiB RAM, BlockManagerId(driver, an-launcher1002.eqiad.wmnet, 13002, None)
24/11/05 15:00:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, an-launcher1002.eqiad.wmnet, 13002, None)
24/11/05 15:00:21 INFO BlockManager: external shuffle service port = 7337
24/11/05 15:00:21 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, an-launcher1002.eqiad.wmnet, 13002, None)
24/11/05 15:00:21 INFO ServerInfo: Adding filter to /metrics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:21 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@8618f2{/metrics/json,null,AVAILABLE,@Spark}
24/11/05 15:00:21 INFO SingleEventLogFileWriter: Logging events to hdfs:/var/log/spark/application_1727783536357_619602.lz4.inprogress
24/11/05 15:00:22 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)
24/11/05 15:00:22 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
24/11/05 15:00:22 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
24/11/05 15:00:22 INFO ServerInfo: Adding filter to /SQL: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:22 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@23f2ddfb{/SQL,null,AVAILABLE,@Spark}
24/11/05 15:00:22 INFO ServerInfo: Adding filter to /SQL/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:22 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@73913dd4{/SQL/json,null,AVAILABLE,@Spark}
24/11/05 15:00:22 INFO ServerInfo: Adding filter to /SQL/execution: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:22 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@83963a5{/SQL/execution,null,AVAILABLE,@Spark}
24/11/05 15:00:22 INFO ServerInfo: Adding filter to /SQL/execution/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:22 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@151241d{/SQL/execution/json,null,AVAILABLE,@Spark}
24/11/05 15:00:22 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
24/11/05 15:00:22 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@45effdcb{/static/sql,null,AVAILABLE,@Spark}
24/11/05 15:00:23 INFO SparkContext: Starting job: reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42
24/11/05 15:00:23 INFO DAGScheduler: Got job 0 (reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42) with 10 output partitions
24/11/05 15:00:23 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42)
24/11/05 15:00:23 INFO DAGScheduler: Parents of final stage: List()
24/11/05 15:00:23 INFO DAGScheduler: Missing parents: List()
24/11/05 15:00:23 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42), which has no missing parents
24/11/05 15:00:23 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 10.5 KiB, free 398.7 MiB)
24/11/05 15:00:23 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.9 KiB, free 398.7 MiB)
24/11/05 15:00:23 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-launcher1002.eqiad.wmnet:13002 (size: 7.9 KiB, free: 398.7 MiB)
24/11/05 15:00:23 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
24/11/05 15:00:23 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
24/11/05 15:00:23 INFO YarnScheduler: Adding task set 0.0 with 10 tasks resource profile 0
24/11/05 15:00:24 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1 for resource profile id: 0)
24/11/05 15:00:25 INFO ExecutorAllocationManager: Requesting 2 new executors because tasks are backlogged (new desired total will be 3 for resource profile id: 0)
24/11/05 15:00:26 INFO ExecutorAllocationManager: Requesting 4 new executors because tasks are backlogged (new desired total will be 7 for resource profile id: 0)
24/11/05 15:00:27 INFO ExecutorAllocationManager: Requesting 3 new executors because tasks are backlogged (new desired total will be 10 for resource profile id: 0)
24/11/05 15:00:30 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.36.9:38994) with ID 3, ResourceProfileId 0
24/11/05 15:00:30 INFO ExecutorMonitor: New executor 3 has registered (new total is 2)
24/11/05 15:00:30 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.36.9:39010) with ID 4, ResourceProfileId 0
24/11/05 15:00:30 INFO ExecutorMonitor: New executor 4 has registered (new total is 3)
24/11/05 15:00:30 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1133.eqiad.wmnet:45067 with 366.3 MiB RAM, BlockManagerId(3, an-worker1133.eqiad.wmnet, 45067, None)
24/11/05 15:00:30 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1133.eqiad.wmnet:42013 with 366.3 MiB RAM, BlockManagerId(4, an-worker1133.eqiad.wmnet, 42013, None)
24/11/05 15:00:30 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.36.9:39024) with ID 1, ResourceProfileId 0
24/11/05 15:00:30 INFO ExecutorMonitor: New executor 1 has registered (new total is 4)
24/11/05 15:00:30 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (an-worker1133.eqiad.wmnet, executor 3, partition 0, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:30 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (an-worker1133.eqiad.wmnet, executor 4, partition 1, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:30 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.36.9:39028) with ID 8, ResourceProfileId 0
24/11/05 15:00:30 INFO ExecutorMonitor: New executor 8 has registered (new total is 5)
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1133.eqiad.wmnet:42883 with 366.3 MiB RAM, BlockManagerId(1, an-worker1133.eqiad.wmnet, 42883, None)
24/11/05 15:00:31 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (an-worker1133.eqiad.wmnet, executor 1, partition 2, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:31 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.21.119:39524) with ID 2, ResourceProfileId 0
24/11/05 15:00:31 INFO ExecutorMonitor: New executor 2 has registered (new total is 6)
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1133.eqiad.wmnet:42723 with 366.3 MiB RAM, BlockManagerId(8, an-worker1133.eqiad.wmnet, 42723, None)
24/11/05 15:00:31 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (an-worker1133.eqiad.wmnet, executor 8, partition 3, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1083.eqiad.wmnet:45129 with 366.3 MiB RAM, BlockManagerId(2, an-worker1083.eqiad.wmnet, 45129, None)
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1133.eqiad.wmnet:42013 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1133.eqiad.wmnet:45067 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:31 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (an-worker1083.eqiad.wmnet, executor 2, partition 4, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1133.eqiad.wmnet:42883 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:31 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.36.143:60018) with ID 7, ResourceProfileId 0
24/11/05 15:00:31 INFO ExecutorMonitor: New executor 7 has registered (new total is 7)
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1133.eqiad.wmnet:42723 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1111.eqiad.wmnet:33595 with 366.3 MiB RAM, BlockManagerId(7, an-worker1111.eqiad.wmnet, 33595, None)
24/11/05 15:00:31 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (an-worker1111.eqiad.wmnet, executor 7, partition 5, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1083.eqiad.wmnet:45129 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:31 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.5.20:43600) with ID 10, ResourceProfileId 0
24/11/05 15:00:31 INFO ExecutorMonitor: New executor 10 has registered (new total is 8)
24/11/05 15:00:31 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.5.20:43606) with ID 6, ResourceProfileId 0
24/11/05 15:00:31 INFO ExecutorMonitor: New executor 6 has registered (new total is 9)
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1149.eqiad.wmnet:40845 with 366.3 MiB RAM, BlockManagerId(10, an-worker1149.eqiad.wmnet, 40845, None)
24/11/05 15:00:31 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1149.eqiad.wmnet:45467 with 366.3 MiB RAM, BlockManagerId(6, an-worker1149.eqiad.wmnet, 45467, None)
24/11/05 15:00:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1111.eqiad.wmnet:33595 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:32 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (an-worker1149.eqiad.wmnet, executor 10, partition 6, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:32 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (an-worker1149.eqiad.wmnet, executor 6, partition 7, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1149.eqiad.wmnet:40845 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on an-worker1149.eqiad.wmnet:45467 (size: 7.9 KiB, free: 366.3 MiB)
24/11/05 15:00:32 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (an-worker1133.eqiad.wmnet, executor 3, partition 8, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:32 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (an-worker1133.eqiad.wmnet, executor 4, partition 9, PROCESS_LOCAL, 4474 bytes) taskResourceAssignments Map()
24/11/05 15:00:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1817 ms on an-worker1133.eqiad.wmnet (executor 3) (1/10)
24/11/05 15:00:32 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1794 ms on an-worker1133.eqiad.wmnet (executor 4) (2/10)
24/11/05 15:00:32 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 58739
24/11/05 15:00:32 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 1627 ms on an-worker1133.eqiad.wmnet (executor 8) (3/10)
24/11/05 15:00:32 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 1747 ms on an-worker1133.eqiad.wmnet (executor 1) (4/10)
24/11/05 15:00:32 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 159 ms on an-worker1133.eqiad.wmnet (executor 4) (5/10)
24/11/05 15:00:32 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 171 ms on an-worker1133.eqiad.wmnet (executor 3) (6/10)
24/11/05 15:00:33 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 1570 ms on an-worker1111.eqiad.wmnet (executor 7) (7/10)
24/11/05 15:00:33 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 1965 ms on an-worker1083.eqiad.wmnet (executor 2) (8/10)
24/11/05 15:00:33 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.53.34:42790) with ID 5, ResourceProfileId 0
24/11/05 15:00:33 INFO ExecutorMonitor: New executor 5 has registered (new total is 10)
24/11/05 15:00:33 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.64.53.34:42798) with ID 9, ResourceProfileId 0
24/11/05 15:00:33 INFO ExecutorMonitor: New executor 9 has registered (new total is 11)
24/11/05 15:00:33 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 1413 ms on an-worker1149.eqiad.wmnet (executor 6) (9/10)
24/11/05 15:00:33 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 1430 ms on an-worker1149.eqiad.wmnet (executor 10) (10/10)
24/11/05 15:00:33 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
24/11/05 15:00:33 INFO DAGScheduler: ResultStage 0 (reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42) finished in 9.823 s
24/11/05 15:00:33 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
24/11/05 15:00:33 INFO YarnScheduler: Killing all running tasks in stage 0: Stage finished
24/11/05 15:00:33 INFO DAGScheduler: Job 0 finished: reduce at /opt/conda-analytics/lib/python3.10/site-packages/pyspark/examples/src/main/python/pi.py:42, took 9.905353 s
Pi is roughly 3.141400
24/11/05 15:00:33 INFO AbstractConnector: Stopped Spark@43847cda{HTTP/1.1, (http/1.1)}{0.0.0.0:4042}
24/11/05 15:00:33 INFO SparkUI: Stopped Spark web UI at http://an-launcher1002.eqiad.wmnet:4042
24/11/05 15:00:33 INFO YarnClientSchedulerBackend: Interrupting monitor thread
24/11/05 15:00:33 INFO YarnClientSchedulerBackend: Shutting down all executors
24/11/05 15:00:33 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
24/11/05 15:00:33 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
24/11/05 15:00:33 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1092.eqiad.wmnet:44323 with 366.3 MiB RAM, BlockManagerId(5, an-worker1092.eqiad.wmnet, 44323, None)
24/11/05 15:00:33 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
24/11/05 15:00:33 INFO BlockManagerMasterEndpoint: Registering block manager an-worker1092.eqiad.wmnet:41653 with 366.3 MiB RAM, BlockManagerId(9, an-worker1092.eqiad.wmnet, 41653, None)
24/11/05 15:00:33 INFO MemoryStore: MemoryStore cleared
24/11/05 15:00:33 INFO BlockManager: BlockManager stopped
24/11/05 15:00:33 INFO BlockManagerMaster: BlockManagerMaster stopped
24/11/05 15:00:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
24/11/05 15:00:33 INFO SparkContext: Successfully stopped SparkContext
24/11/05 15:00:34 INFO ShutdownHookManager: Shutdown hook called
24/11/05 15:00:34 INFO ShutdownHookManager: Deleting directory /tmp/spark-dc90d5ed-db4f-464e-bc93-00e903223bba
24/11/05 15:00:34 INFO ShutdownHookManager: Deleting directory /tmp/spark-c78d3d84-03b1-42e4-b45f-32c08cffda07/pyspark-b9d3d252-1939-40be-8dd1-fc064acbbf22
24/11/05 15:00:34 INFO ShutdownHookManager: Deleting directory /tmp/spark-c78d3d84-03b1-42e4-b45f-32c08cffda07I'm using the following spark3-submit command in our container, to ferret our potential configuration or infrastructure issues.
spark3-submit --master yarn --deploy-mode client /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py 10
Right now, it seems that an-master100{3,4}.eqiad.wmnet:8032 (the Yarn resourcemanager IPC) cannot be egressed to form the DSE_KUBEPODS_NETWORKS srange.
From an-launcher1002:
brouberol@an-launcher1002:~$ telnet an-master1003.eqiad.wmnet 8032 Trying 2620:0:861:106:10:64:36:15... Trying 10.64.36.15... Connected to an-master1003.eqiad.wmnet. Escape character is '^]'. ^] telnet> quit Connection closed.
From a dse-worker host:
brouberol@dse-k8s-worker1007:~$ telnet an-master1003.eqiad.wmnet 8032 Trying 2620:0:861:106:10:64:36:15... ^C brouberol@dse-k8s-worker1007:~$
From an airflow pod:
>>> import socket >>> def port_open(host, port): ... sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ... sock.settimeout(1) ... result = sock.connect_ex((host, port)) ... if result == 0: ... print("Port is open") ... else: ... print("Port is not open") ... sock.close() ... >>> port_open('10.64.53.14', 8032) Port is not open >>> port_open('10.64.36.15', 8032) Port is not open
The port is missing from the external_service Service, but things are failing even after I manually punch it in, meaning I think we're short of an nftable/ferm rule.
Edit: actually the ferm rules were fine, I had forgotten to manually edit the Endpoints backing the Service. https://gerrit.wikimedia.org/r/c/operations/puppet/+/1087500 should solve the issue.
Change #1087500 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/puppet@production] global_config: register the Yarn resourcemanager IPC port in the hadoop service
Now that we're passed the Yarn ResourceManager not being reachable, we're getting into fun Kerberos related issues:
24/11/05 15:48:47 INFO RetryInvocationHandler: java.io.IOException: DestHost:destPort an-master1004.eqiad.wmnet:8020 , LocalHost:localPort airflow-scheduler-5b4c46548-mh5kk/10.67.29.209:0. Failed on local exception: java.io.IOException: Couldn't set up IO streams: java.lang.IllegalArgumentException: Server has invalid Kerberos principal: hdfs/an-master1004.eqiad.wmnet@WIKIMEDIA, expecting: hdfs/10-64-53-14.hadoop-master-analytics.external-services.svc.cluster.local@WIKIMEDIA, while invoking ClientNamenodeProtocolTranslatorPB.getDelegationToken over an-master1004.eqiad.wmnet/10.64.53.14:8020 after 1 failover attempts. Trying to failover after sleeping for 1412ms. 24/11/05 15:48:49 INFO RetryInvocationHandler: java.io.IOException: DestHost:destPort an-master1003.eqiad.wmnet:8020 , LocalHost:localPort airflow-scheduler-5b4c46548-mh5kk/10.67.29.209:0. Failed on local exception: java.io.IOException: Couldn't set up IO streams: java.lang.IllegalArgumentException: Server has invalid Kerberos principal: hdfs/an-master1003.eqiad.wmnet@WIKIMEDIA, expecting: hdfs/10-64-36-15.hadoop-master-analytics.external-services.svc.cluster.local@WIKIMEDIA, while invoking ClientNamenodeProtocolTranslatorPB.getDelegationToken over an-master1003.eqiad.wmnet/10.64.36.15:8020 after 2 failover attempts. Trying to failover after sleeping for 2794ms.
Change #1087500 merged by Brouberol:
[operations/puppet@production] global_config: register the Yarn resourcemanager IPC port in the hadoop service
What ended up helping was setting dfs.namenode.kerberos.principal to hdfs/an-master1003.eqiad.wmnet@WIKIMEDIA instead of hdfs/_HOST@WIKIMEDIA.
The _HOST substitution is explained here. I wonder if setting hadoop.security.dns.nameserver to our "main" DNS servers (such as 10.3.0.1), to have the _HOST DNS translation happen _outside_ Kubernetes, which should help us sidestep CoreDNS.
OK. What will happen when we fail over from an-master1003 to an-master1004?
https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/Hadoop/Administration#High_Availability
Do you think that we should be using ignore_acceptor_hostname in the kerberos config, or is there some other way that we could use an alias to the high-availability service?
Confirmed, this hostname is coming from the reverse DNS of the server IP:
>>> socket.gethostbyaddr('10.64.36.15')
('10-64-36-15.hadoop-master-analytics.external-services.svc.cluster.local', [], ['10.64.36.15'])What I've ended up doing is deploying a slightly different kerberos configuration on the pod in charge of executing the spark3-submit command, with rdns=false and without dns_canonicalize_hostname = false (which overrides the rdns config, cf https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html#libdefaults)
rdns
If this flag is true, reverse name lookup will be used in addition to forward name lookup to canonicalizing hostnames for use in service principal names. If dns_canonicalize_hostname is set to false, this flag has no effect. The default value is truespark3-submit is now failing with
24/11/06 09:12:04 INFO SparkHadoopUtil: Updating delegation tokens for current user. 24/11/06 09:12:04 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances 24/11/06 09:12:04 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] 24/11/06 09:12:04 INFO ConfiguredRMFailoverProxyProvider: Failing over to an-master1004-eqiad-wmnet 24/11/06 09:12:04 INFO RetryInvocationHandler: java.net.ConnectException: Call From airflow-scheduler-58d7978b95-9x864/10.67.30.148 to an-master1004.eqiad.wmnet:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused, while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over an-master1004-eqiad-wmnet after 1 failover attempts. Trying to failover after sleeping for 1945ms.
Actually, that kerberos error comes from a manual typo, let's not take it into account!
I was able to access HDFS using hdfs dfs -ls / !
airflow@airflow-scheduler-58d7978b95-9x864:/opt/airflow$ hdfs dfs -ls log4j:ERROR Could not find value for key log4j.appender.NullAppender log4j:ERROR Could not instantiate appender named "NullAppender". Found 17 items drwx------ - analytics analytics 0 2024-11-06 00:00 .Trash drwx------ - analytics analytics 0 2023-01-25 17:28 .flink drwxr-x--- - analytics analytics 0 2024-11-06 09:17 .skein drwxr-xr-x - analytics analytics 0 2024-11-06 09:20 .sparkStaging drwx------ - analytics analytics 0 2024-11-06 09:17 .staging -rw------- 3 analytics analytics 89 2023-06-23 13:44 aqs_testing_password.txt -rw-r--r-- 3 analytics analytics 2277 2024-05-08 20:51 aqu-log4j.properties drwxr-xr-x - analytics analytics 0 2019-12-18 17:13 data drwxr-x--- - analytics analytics-privatedata-users 0 2024-10-21 12:39 event_alt -rw------- 3 analytics analytics 16 2019-06-01 07:55 mysql-analytics-labsdb-client-pw.txt -rw------- 3 analytics analytics 24 2019-06-01 07:55 mysql-analytics-research-client-pw.txt -rw-r----- 3 analytics analytics 317110 2024-09-16 09:42 pyspark_extension-2.12.0.3.1.tar.gz -rw-r----- 3 hdfs analytics 860259 2024-11-05 14:43 spark-examples_2.10-1.1.1.jar drwxr-x--- - analytics analytics 0 2024-10-01 12:20 staging -rw-r----- 3 analytics analytics 101938 2024-09-18 14:53 stream_config_20240918.json -rwxr-xr-x 3 analytics analytics 105801 2024-09-30 13:48 stream_config_30p.json -rw-r----- 3 analytics analytics-privatedata-users 133 2021-12-06 16:05 swift_auth_analytics_admin.env
This bodes well for spark3-submit (he said optimistically)
Great! Funnily enough I did think of the rdns flag, but I must have squinted at it, because I thought it was already false.
Actually, the issue goes deeper. Setting the rdns = false flag in krb5.conf does not solve the issue of the kerberos server hostname mismatch. What Hadoop does is perform a reverse DNS query no matter how Kerberos is configgured (cf source)
We now have a thorny issue because we'd like to keep coreDNS in our resolv.conf, as we need it to resolve the cloudnative-pg pooler service domain, BUT we also'd like to sidestep coreDNS to the benefit of the wikimedia DNS servers when performing the reverse DNS resolution of the nameserver IP.
After some trial and error, what ended up working was specifying a hostAlias:
hostAliases: - ip: "10.64.36.15" hostnames: - an-master1003.eqiad.wmnet
which generates a slightly tuned /etc/hosts file:
airflow@airflow-scheduler-9cdb97f69-9nc5g:/opt/airflow$ cat /etc/hosts # Kubernetes-managed hosts file. 127.0.0.1 localhost ::1 localhost ip6-localhost ip6-loopback fe00::0 ip6-localnet fe00::0 ip6-mcastprefix fe00::1 ip6-allnodes fe00::2 ip6-allrouters 10.67.30.214 airflow-scheduler-9cdb97f69-9nc5g 2620:0:861:302:f380:aae4:1a76:e6fc airflow-scheduler-9cdb97f69-9nc5g # Entries added by HostAliases. 10.64.36.15 an-master1003.eqiad.wmnet
This allows me to pass the Kerbarrier!
24/11/06 10:29:42 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://airflow-scheduler-9cdb97f69-9nc5g:4040 24/11/06 10:29:43 INFO HadoopDelegationTokenManager: Attempting to load user's ticket cache. 24/11/06 10:29:43 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1231454_17, ugi=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA (auth:KERBEROS)]] with renewer yarn/an-master1003.eqiad.wmnet@WIKIMEDIA 24/11/06 10:29:44 INFO DFSClient: Created token for analytics: HDFS_DELEGATION_TOKEN owner=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, renewer=yarn, realUser=, issueDate=1730888984245, maxDate=1731493784245, sequenceNumber=27006818, masterKeyId=2095 on ha-hdfs:analytics-hadoop 24/11/06 10:29:44 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1231454_17, ugi=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA (auth:KERBEROS)]] with renewer analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA 24/11/06 10:29:44 INFO DFSClient: Created token for analytics: HDFS_DELEGATION_TOKEN owner=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, renewer=analytics, realUser=, issueDate=1730888984317, maxDate=1731493784317, sequenceNumber=27006819, masterKeyId=2095 on ha-hdfs:analytics-hadoop 24/11/06 10:29:44 INFO HadoopFSDelegationTokenProvider: Renewal interval is 86400087 for token HDFS_DELEGATION_TOKEN 24/11/06 10:29:45 INFO HiveConf: Found configuration file file:/etc/spark3/conf/hive-site.xml 24/11/06 10:29:45 ERROR FileUtils: The jar file path file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar doesn't exist 24/11/06 10:29:46 INFO SparkHadoopUtil: Updating delegation tokens for current user. 24/11/06 10:29:46 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances 24/11/06 10:29:46 INFO Client: Requesting a new application from cluster with 107 NodeManagers 24/11/06 10:29:46 INFO Configuration: resource-types.xml not found 24/11/06 10:29:46 INFO ResourceUtils: Unable to find 'resource-types.xml'. 24/11/06 10:29:46 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (49152 MB per container) 24/11/06 10:29:46 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 24/11/06 10:29:46 INFO Client: Setting up container launch context for our AM 24/11/06 10:29:46 INFO Client: Setting up the launch environment for our AM container 24/11/06 10:29:46 INFO Client: Preparing resources for our AM container 24/11/06 10:29:46 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar 24/11/06 10:29:46 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_631153/pyspark.zip 24/11/06 10:30:46 INFO DataStreamer: Exception in createBlockOutputStream blk_2566234840_1492587613 org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.64.53.33:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:534) at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253) at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1725) at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1679) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:716) 24/11/06 10:30:46 WARN DataStreamer: Abandoning BP-1552854784-10.64.21.110-1405114489661:blk_2566234840_1492587613 24/11/06 10:30:46 WARN DataStreamer: Excluding datanode DatanodeInfoWithStorage[10.64.53.33:50010,DS-db2b3b38-9a3f-4b01-928f-fc3245ea3c85,DISK]
I now seem to be stopped by a missing network policy.
I was missing the networkpolicy allowing egress to the hadoop workers. With that in place, I see the following logs:
24/11/06 11:13:44 INFO HadoopFSDelegationTokenProvider: Renewal interval is 86400077 for token HDFS_DELEGATION_TOKEN
24/11/06 11:13:45 INFO HiveConf: Found configuration file file:/etc/spark3/conf/hive-site.xml
24/11/06 11:13:45 ERROR FileUtils: The jar file path file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar doesn't exist
24/11/06 11:13:45 INFO SparkHadoopUtil: Updating delegation tokens for current user.
24/11/06 11:13:45 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
24/11/06 11:13:46 INFO Client: Requesting a new application from cluster with 107 NodeManagers
24/11/06 11:13:46 INFO Configuration: resource-types.xml not found
24/11/06 11:13:46 INFO ResourceUtils: Unable to find 'resource-types.xml'.
24/11/06 11:13:46 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (49152 MB per container)
24/11/06 11:13:46 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
24/11/06 11:13:46 INFO Client: Setting up container launch context for our AM
24/11/06 11:13:46 INFO Client: Setting up the launch environment for our AM container
24/11/06 11:13:46 INFO Client: Preparing resources for our AM container
24/11/06 11:13:46 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar
24/11/06 11:13:46 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_631643/pyspark.zip
24/11/06 11:13:46 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_631643/py4j-0.10.9-src.zip
24/11/06 11:13:46 INFO Client: Uploading resource file:/tmp/spark-317074b2-4048-44c2-8b62-5381b553966a/__spark_conf__7400012480606812375.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_631643/__spark_conf__.zip
24/11/06 11:13:46 INFO SecurityManager: Changing view acls to: airflow,analytics
24/11/06 11:13:46 INFO SecurityManager: Changing modify acls to: airflow,analytics
24/11/06 11:13:46 INFO SecurityManager: Changing view acls groups to:
24/11/06 11:13:46 INFO SecurityManager: Changing modify acls groups to:
24/11/06 11:13:46 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(airflow, analytics); groups with view permissions: Set(); users with modify permissions: Set(airflow, analytics); groups with modify permissions: Set()
24/11/06 11:13:46 INFO Client: Submitting application application_1727783536357_631643 to ResourceManager
24/11/06 11:13:47 INFO YarnClientImpl: Submitted application application_1727783536357_631643
24/11/06 11:13:48 INFO Client: Application report for application_1727783536357_631643 (state: ACCEPTED)
24/11/06 11:13:48 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: production
start time: 1730891626772
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_631643/
user: analytics
24/11/06 11:13:49 INFO Client: Application report for application_1727783536357_631643 (state: ACCEPTED)
24/11/06 11:13:50 INFO Client: Application report for application_1727783536357_631643 (state: ACCEPTED)
24/11/06 11:13:51 INFO Client: Application report for application_1727783536357_631643 (state: RUNNING)
24/11/06 11:13:51 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: N/A
ApplicationMaster host: 10.64.53.17
ApplicationMaster RPC port: -1
queue: production
start time: 1730891626772
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_631643/
user: analytics
24/11/06 11:13:51 INFO YarnClientSchedulerBackend: Application application_1727783536357_631643 has started running.
24/11/06 11:13:51 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 13000.
24/11/06 11:13:51 INFO NettyBlockTransferService: Server created on airflow-scheduler-67bbd4f8cc-6lp2w:13000
24/11/06 11:13:51 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
24/11/06 11:13:51 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, airflow-scheduler-67bbd4f8cc-6lp2w, 13000, None)
24/11/06 11:13:51 INFO BlockManagerMasterEndpoint: Registering block manager airflow-scheduler-67bbd4f8cc-6lp2w:13000 with 413.9 MiB RAM, BlockManagerId(driver, airflow-scheduler-67bbd4f8cc-6lp2w, 13000, None)
24/11/06 11:13:51 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, airflow-scheduler-67bbd4f8cc-6lp2w, 13000, None)
24/11/06 11:13:51 INFO BlockManager: external shuffle service port = 7337
24/11/06 11:13:51 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, airflow-scheduler-67bbd4f8cc-6lp2w, 13000, None)
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@403e76{/metrics/json,null,AVAILABLE,@Spark}
24/11/06 11:13:51 INFO SingleEventLogFileWriter: Logging events to hdfs:/var/log/spark/application_1727783536357_631643.lz4.inprogress
24/11/06 11:13:51 INFO Utils: Using initial executors = 0, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
24/11/06 11:13:51 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
24/11/06 11:13:51 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
24/11/06 11:13:51 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions.
java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:207)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1191)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1189)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1189)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:103)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@37c7c0e5{/SQL,null,AVAILABLE,@Spark}
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2cbbdac4{/SQL/json,null,AVAILABLE,@Spark}
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5fb7523d{/SQL/execution,null,AVAILABLE,@Spark}
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@a794a7{/SQL/execution/json,null,AVAILABLE,@Spark}
24/11/06 11:13:51 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6399ab49{/static/sql,null,AVAILABLE,@Spark}
24/11/06 11:13:53 INFO SparkContext: Starting job: reduce at /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py:42
24/11/06 11:13:53 INFO DAGScheduler: Got job 0 (reduce at /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py:42) with 10 output partitions
24/11/06 11:13:53 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py:42)
24/11/06 11:13:53 INFO DAGScheduler: Parents of final stage: List()
24/11/06 11:13:53 INFO DAGScheduler: Missing parents: List()
24/11/06 11:13:53 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[1] at reduce at /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py:42), which has no missing parents
24/11/06 11:13:53 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 10.4 KiB, free 413.9 MiB)
24/11/06 11:13:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 7.9 KiB, free 413.9 MiB)
24/11/06 11:13:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on airflow-scheduler-67bbd4f8cc-6lp2w:13000 (size: 7.9 KiB, free: 413.9 MiB)
24/11/06 11:13:53 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
24/11/06 11:13:53 INFO DAGScheduler: Submitting 10 missing tasks from ResultStage 0 (PythonRDD[1] at reduce at /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py:42) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))
24/11/06 11:13:53 INFO YarnScheduler: Adding task set 0.0 with 10 tasks resource profile 0
24/11/06 11:13:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
24/11/06 11:13:54 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request more executors!
24/11/06 11:13:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
24/11/06 11:13:54 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request more executors!A couple of thoughts:
- tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_631643/: maybe the yarn.wikimedia.org address is only useful for access, and not submission, so we might be able to skip T378920
24/11/06 11:13:51 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions. java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) ...
are we missing a jar?
24/11/06 11:13:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered! 24/11/06 11:13:54 WARN ExecutorAllocationManager: Unable to reach the cluster manager to request more executors!
- ah, this seems to indicate that we're querying yarn.wikimedia.org which can't be reached.
Ah yes, I believe we are.
Here is where we add the iceberg jar to the conda-analytics environment: https://gitlab.wikimedia.org/repos/data-engineering/conda-analytics/-/blob/3fc930798b4967f2fbb5e5c3da890aa6992c6d10/docker/Dockerfile#L35-37
I'm in the process of upgrading this to version 1.3.1
Perhaps it would be possible to retrieve this jar from HDFS, rather than bake it into the airflow image. Not sure what's best here.
Hmm. Maybe you're right, but I'm not sure.
I would still expect it to be using the an-master names for the resource manager, as per the yarn rmadmin commands here: https://wikitech.wikimedia.org/wiki/Data_Platform/Systems/Hadoop/Administration#Manual_Failover
However, I think we will probably have to proceed sooner or later with with: T378920: Determine whether or not we need expose an internal service for yarn
Looking at https://yarn.wikimedia.org/cluster/app/application_1727783536357_631643, we see that the application failed, due to
Caused by: java.io.IOException: Failed to connect to airflow-scheduler-67bbd4f8cc-6lp2w:12000 ... Caused by: java.net.UnknownHostException: airflow-scheduler-67bbd4f8cc-6lp2w ...
There seems to be some kind of callback from yarn to the application itself, and because the hostname can't be resolved by the yarn host, this fails.
That being said, the good news is that the job can be found in Spark-History: https://yarn.wikimedia.org/spark-history/history/application_1727783536357_631643/jobs/job/?id=0
Looking at https://yarn.wikimedia.org/spark-history/history/application_1727783536357_631643/environment/, we see
- spark.driver.appUIAddress: http://airflow-scheduler-67bbd4f8cc-6lp2w:4040
- spark.driver.host: airflow-scheduler-67bbd4f8cc-6lp2w
I'm not sure how we're supposed to enable the Spark application being able to contact the driver from outside Kubernetes at this point.
Thanks to @BTullis for making me realize that we need the driver to run in YARN as well, which means --deploy-mode cluster instead of --deploy-mode client!
airflow@airflow-scheduler-67bbd4f8cc-6lp2w:/opt/airflow$ spark3-submit --master yarn --deploy-mode cluster /usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py 10
SPARK_HOME: /usr/local/lib/python3.9/site-packages/pyspark
Using Hadoop client lib jars at 3.2.0, provided by Spark.
24/11/06 12:26:11 INFO Client: Requesting a new application from cluster with 107 NodeManagers
24/11/06 12:26:11 INFO Configuration: resource-types.xml not found
24/11/06 12:26:11 INFO ResourceUtils: Unable to find 'resource-types.xml'.
24/11/06 12:26:11 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (49152 MB per container)
24/11/06 12:26:11 INFO Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
24/11/06 12:26:11 INFO Client: Setting up container launch context for our AM
24/11/06 12:26:11 INFO Client: Setting up the launch environment for our AM container
24/11/06 12:26:11 INFO Client: Preparing resources for our AM container
24/11/06 12:26:11 INFO Client: Source and destination file systems are the same. Not copying hdfs:/user/spark/share/lib/spark-3.1.2-assembly.jar
24/11/06 12:26:11 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/examples/src/main/python/pi.py -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_632194/pi.py
24/11/06 12:26:12 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_632194/pyspark.zip
24/11/06 12:26:12 INFO Client: Uploading resource file:/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_632194/py4j-0.10.9-src.zip
24/11/06 12:26:12 INFO Client: Uploading resource file:/tmp/spark-6220b0ce-b0f9-498e-819c-12ed7db6f082/__spark_conf__1610636498025214857.zip -> hdfs://analytics-hadoop/user/analytics/.sparkStaging/application_1727783536357_632194/__spark_conf__.zip
24/11/06 12:26:12 INFO SecurityManager: Changing view acls to: airflow,analytics
24/11/06 12:26:12 INFO SecurityManager: Changing modify acls to: airflow,analytics
24/11/06 12:26:12 INFO SecurityManager: Changing view acls groups to:
24/11/06 12:26:12 INFO SecurityManager: Changing modify acls groups to:
24/11/06 12:26:12 INFO SecurityManager: SecurityManager: authentication enabled; ui acls disabled; users with view permissions: Set(airflow, analytics); groups with view permissions: Set(); users with modify permissions: Set(airflow, analytics); groups with modify permissions: Set()
24/11/06 12:26:13 INFO HadoopDelegationTokenManager: Attempting to load user's ticket cache.
24/11/06 12:26:13 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1192866876_1, ugi=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA (auth:KERBEROS)]] with renewer yarn/an-master1003.eqiad.wmnet@WIKIMEDIA
24/11/06 12:26:13 INFO DFSClient: Created token for analytics: HDFS_DELEGATION_TOKEN owner=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, renewer=yarn, realUser=, issueDate=1730895973049, maxDate=1731500773049, sequenceNumber=27008040, masterKeyId=2095 on ha-hdfs:analytics-hadoop
24/11/06 12:26:13 INFO HadoopFSDelegationTokenProvider: getting token for: DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1192866876_1, ugi=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA (auth:KERBEROS)]] with renewer analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA
24/11/06 12:26:13 INFO DFSClient: Created token for analytics: HDFS_DELEGATION_TOKEN owner=analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, renewer=analytics, realUser=, issueDate=1730895973059, maxDate=1731500773059, sequenceNumber=27008041, masterKeyId=2095 on ha-hdfs:analytics-hadoop
24/11/06 12:26:13 INFO HadoopFSDelegationTokenProvider: Renewal interval is 86400057 for token HDFS_DELEGATION_TOKEN
24/11/06 12:26:14 INFO HiveConf: Found configuration file file:/etc/spark3/conf/hive-site.xml
24/11/06 12:26:14 ERROR FileUtils: The jar file path file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar doesn't exist
24/11/06 12:26:14 ERROR FileUtils: The jar file path file:///usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar doesn't exist
24/11/06 12:26:15 INFO metastore: Trying to connect to metastore with URI thrift://analytics-hive.eqiad.wmnet:9083
24/11/06 12:26:15 INFO metastore: Opened a connection to metastore, current connections: 1
24/11/06 12:26:15 INFO metastore: Connected to metastore.
24/11/06 12:26:15 INFO Hive: Registering function english_stemmer org.wikimedia.analytics.refinery.hive.Stemmer
24/11/06 12:26:15 INFO metastore: Closed a connection to metastore, current connections: 0
24/11/06 12:26:15 INFO Client: Submitting application application_1727783536357_632194 to ResourceManager
24/11/06 12:26:15 INFO YarnClientImpl: Submitted application application_1727783536357_632194
24/11/06 12:26:16 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:16 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: AM container is launched, waiting for AM container to Register with RM
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: production
start time: 1730895975614
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_632194/
user: analytics
24/11/06 12:26:17 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:18 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:19 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:20 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:21 INFO Client: Application report for application_1727783536357_632194 (state: ACCEPTED)
24/11/06 12:26:22 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:22 INFO Client:
client token: Token { kind: YARN_CLIENT_TOKEN, service: }
diagnostics: N/A
ApplicationMaster host: an-worker1152.eqiad.wmnet
ApplicationMaster RPC port: 12000
queue: production
start time: 1730895975614
final status: UNDEFINED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_632194/
user: analytics
24/11/06 12:26:23 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:24 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:25 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:26 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:27 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:28 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:29 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:30 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:31 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:32 INFO Client: Application report for application_1727783536357_632194 (state: RUNNING)
24/11/06 12:26:33 INFO Client: Application report for application_1727783536357_632194 (state: FINISHED)
24/11/06 12:26:33 INFO Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: an-worker1152.eqiad.wmnet
ApplicationMaster RPC port: 12000
queue: production
start time: 1730895975614
final status: SUCCEEDED
tracking URL: http://an-master1003.eqiad.wmnet:8088/proxy/application_1727783536357_632194/
user: analytics
24/11/06 12:26:33 INFO ShutdownHookManager: Shutdown hook called
24/11/06 12:26:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-6220b0ce-b0f9-498e-819c-12ed7db6f082
24/11/06 12:26:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-a4305ebd-d2cb-4c7f-9fa4-36cb19324b84We can see the app report in https://yarn.wikimedia.org/cluster/app/application_1727783536357_632194, as well as in https://yarn.wikimedia.org/spark-history/history/application_1727783536357_632194/1/jobs/
This validates that we don't have to implement an internal vhost for yarn.wikimedia.org (T378920), as spark3-submit does not seem to be talking to YARN directly. We might still need it if we need to support Skein (T377602).
Iceberg jar:
Perhaps it would be possible to retrieve this jar from HDFS, rather than bake it into the airflow image. Not sure what's best here.
I think we want iceberg to work for e.g. pyspark and spark3-shell and wmfdata automatically, so we want it to be loaded for all spark sessions without extra configurations. 'baking into the airflow' image doesn't seem quite right, but 'baking it into our spark distribution' does seem correct to me.
which means --deploy-mode cluster instead of --deploy-mode client!
The downside of deploy-mode cluster is that driver logs are not available to the process that submits the spark job.
--deploy-mode=cluster is really only useful to make sure the driver does not take up resources from the localhost. If we are using KubernetesExectutor, where spark-submit is not launched alongside of the scheduler anyway, then --deploy-mode=client would be much preferred!
I don't see how we could make -deploy-mode=client work in Kubernetes with an "extern" YARN TBH. We would need to define a way for the Spark application to "call home" to its driver living in Kubernetes, meaning a that we'd need to ingress into the cluster first, and then a dedicated Service created at runtime for each Spark job.
Out of the box, I'm not sure this is going to work. For this reason, I think we either run both driver and app in YARN, or in Kubernetes, with the SparkKubernetesOperator. I'm not sure an in-between is a realistic target.
@BTullis, WDYT?
These are one and the same, at the moment. Just like with the deb package of airflow, we use a binary distribution of pyspark, although it's from pypi rather than conda-forge.
Could we not just add a spark.jars default configuration option, pointing to an HDFS location of the iceberg jar?
That's the way that the SparkApplication kubernetes object does it.
which means --deploy-mode cluster instead of --deploy-mode client!
The downside of deploy-mode cluster is that driver logs are not available to the process that submits the spark job.
--deploy-mode=cluster is really only useful to make sure the driver does not take up resources from the localhost. If we are using KubernetesExectutor, where spark-submit is not launched alongside of the scheduler anyway, then --deploy-mode=client would be much preferred!
Just so I'm clear, we don't have the driver logs in Airflow at the moment though, do we? This would be a new feature if we could look at the spark driver logs from the Airflow UI? I can certainly see the benefit of not having to run yarn logs on another host to get at them.
Could we not just add a spark.jars default configuration option, pointing to an HDFS location of the iceberg jar?
I can certainly test it out! Does anyone happen to know where this jar is stored in HDFS atm?
Could we not just add a spark.jars default configuration option, pointing to an HDFS location of the iceberg jar?
It isn't?
We could do this, but then we'd have to have something that managed the deployment of this jar to HDFS. The only thing that does stuff like that right now is the per-instance airflow artifact sync config/artifacts.yaml stuff. I don' t think we should use that to manage our global distribution dependencies; that is meant more for deployment of job artifacts and dependencies.
Hm, there is also the 'spark assembly' jars in HDFS..which looking in puppet used to be managed by Puppet, but spark3.pp profile has a "TODO implement" comment (from me?). So I'd guess they are manually copied to HDFS.
These are one and the same, at the moment.
I think it would be preferrable to (eventually) provide a 'data lake client image' that included our managed spark distribution and officially supported dependencies, like Spark, Hive, Iceberg (and maybe Flink?). Kind of a 'conda-analytics' but an image. For now, since we need to use LocalExcecutor, airflow could be included too (just like you are doing). In the KubernetesExecutor future, ideally the default executor pod would be this 'data lake client image', and the airflow image(s) would not need any Spark etc. dependencies in them.
Just so I'm clear, we don't have the driver logs in Airflow at the moment though, do we?
Via skein and --deploy-mode=client, we do: https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/wmf_airflow_common/hooks/skein.py#L244
OK, I think I'm getting it then. So looking at the logs of a current skein job (ssh tunnel to an-launcher1002 required) we can see the skein application spec.
Here we can see --deploy=mode client and --master yarn as well as a list of several jars, namely refinery and a cassandra-connector-assembly jar.
So next we need to take our sample spark3-submit command line and convert this into a skein application spec, which we launch manually.
Our spark driver pod will still be running in YARN, but it will be using --deploy=mode client and the executors will be able to call back to it.
Once we have got that working, we can turn it into a DAG. Does that sound right to you @Ottomata?
Yes, I think these are good ideas. Certainly it has felt like a retrograde step for us to downgrade our airflow images to bullseye, just so that we can install the hadoop-client packages into them, along with openjdk-8.
I think that we will definitely be able to make improvements here, pretty soon.
But before we decide exactly what to do, here are two more ideas to throw into the mix:
- The KubernetesPodOperator - This allows us to launch tasks using any image at all, so it could be this data lake client inage which has our complete set of managed tools built in. I'm looking into this operator for the Dumps 1.0 migration work, where we would be launching a mediawiki based pod and running mwmaint scripts in a DAG task, to perform the dumps.
- Volume mounts based on either managed PVs or images. The images feature isn't in our version of k8s yet, but it would mean that you could effectively take an airflow image and mount your chosen tools to a path from another image. e.g. /opt/spark or /opt/hadoop or whatever, from an image that is maintained separately. We could already do this with Ceph PVs, if we wish.
Volume mounts based on either managed PVs or images.
Wow TIL k8s images and OCI 'artifacts'. Interesting!
The KubernetesPodOperator
Huh. This would kind of be like our use of Skein inside of our spark operator & hook. We could wrap KubernetesPodOperator inside of our SparkSubmitOperator, to have it call out to KubernetesPodOperator instead. Interesting.
Yeah, there are a lot of ways to get this stuff done huh? Eventually (maybe before we start using KubernetesExecutor for Spark jobs?) we should consider all of this and write the options down and choose wisely. (Or, maybe this is all already in your airflow on k8s doc?)
If I'm reading this correctly, we can't get rid of skein because our spark executors can't talk back to their driver for network reasons if we have the driver running in a pod, right?
The downside of deploy-mode cluster is that driver logs are not available to the process that submits the spark job.
Ah, I am remembering another reason we do skein + deploy-mode=client. In order to support different python versions and different spark versions, etc. we need the python executable that is used by spark-submit to be the same one that is used by executors. Otherwise things can get weird.
To accomplish this, SparkSubmitOperator.for_virtualenv (a conda env is a 'virtualenv' for these purposes) defaults to using skein to ship the packaged virtualenv / condaenv to the yarn worker, and then use the environments packaged python executable to launch spark-submit.
So, the while the 'driver logs in airflow UI' requirement is just nice to have (folks could still find those logs via yarn logs CLI after the yarn app finishes), the requirement of using spark + custom python version is necessary.
I think it should be possible to use skein + deploy-mode=cluster, but we decided not to do that because:
- we don't get driver logs in airflow
- Using skein simply to run spark-submit seemed like a waste of resources...why not just run the driver in the skein yarn app master.
But, I think we'll still need to use skein until as long as people use conda to deploy pyspark jobs.
Coincidentally enough, I have just been working on this as part of T372417: Switch from miniconda to miniforge and, as far as I can tell, mediawiki-content-dump is the only production case where we are currently doing this. This case is specifically to support a more recent spark version (currently 3.3) and we are about to start working on T338057: Upgrade Spark to a version with long term Iceberg support, and with fixes to support Dumps 2.0 with probably version 3.5.3 as a target.
We used to use this method for the gdi-jobs as well, but they have been decommissioned quite recently.
I do take your point about the driver logs being useful to view in Airflow, though.
I think we should try to avoid removing that feature, if we can.
as far as I can tell, mediawiki-content-dump is the only production case where we are currently doing this.
There are many. See the various instance's config/artifacts.yaml files.
- https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/research/config/artifacts.yaml
- https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/search/config/artifacts.yaml
- https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/platform_eng/config/artifacts.yaml
- https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/blob/main/analytics_product/config/artifacts.yaml
Ah, thanks. None of these were showing up in GitLab search nor Codesearch.
Maybe we should add all of these groups to Codesearch in the same way that we did for data_engineering.
airflow@airflow-scheduler-6f5d95856-pc2k5:/opt/airflow$ which yarn /usr/bin/yarn
yarn is installed in our airflow image
airflow@airflow-scheduler-6f5d95856-pc2k5:/opt/airflow$ cat << EOF > /tmp/job.yaml
acls:
enable: false
modify_groups: []
modify_users: []
ui_users: []
view_groups: []
view_users: []
file_systems: []
master:
env: {}
files: {}
log_level: INFO
resources:
fpgas: 0
gpus: 0
memory: 1024
vcores: 1
script: hdfs dfs -ls /wmf/cache
max_attempts: 1
name: run_cmd_via_skein__list_hdfs_files__20241023
node_label: ''
queue: default
services: {}
tags: []
user: ''
EOF
airflow@airflow-scheduler-6f5d95856-pc2k5:/opt/airflow$ skein application submit /tmp/job.yaml
Skein global security credentials not found, writing now to '/home/airflow/.skein'.
24/11/08 15:02:32 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
24/11/08 15:02:33 INFO skein.Driver: Driver started, listening on 44373
24/11/08 15:02:33 INFO conf.Configuration: resource-types.xml not found
24/11/08 15:02:33 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/11/08 15:02:33 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
24/11/08 15:02:33 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
24/11/08 15:02:35 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
24/11/08 15:02:36 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
24/11/08 15:02:37 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
24/11/08 15:02:38 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
24/11/08 15:02:39 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
24/11/08 15:02:40 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)Seems like Skein isn't reading the resource manager config?
However, Skein correctly creates its certificate:
airflow@airflow-scheduler-6f5d95856-pc2k5:/opt/airflow$ ls /home/airflow/.skein/ skein.crt skein.lock skein.pem
Oh, I know what's happening. I'm performing my tests from the scheduler, but as airflow-test-k8s isn't configured with LocalExecutor, we're missing the hadoop configuration files. I'm going to tweak the chart locally to deploy the config files on the scheduler for the sake of this test.
airflow@airflow-scheduler-69759cd84f-ktzbv:/opt/airflow$ skein application submit /tmp/job.yaml Skein global security credentials not found, writing now to '/home/airflow/.skein'. 24/11/08 15:10:44 INFO skein.Driver: Driver started, listening on 33127 24/11/08 15:10:44 INFO conf.Configuration: resource-types.xml not found 24/11/08 15:10:44 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 24/11/08 15:10:44 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE 24/11/08 15:10:44 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE 24/11/08 15:10:44 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 27086684 for analytics on ha-hdfs:analytics-hadoop 24/11/08 15:10:44 INFO security.TokenCache: Got dt for hdfs://analytics-hadoop; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:analytics-hadoop, Ident: (HDFS_DELEGATION_TOKEN token 27086684 for analytics) 24/11/08 15:10:44 INFO skein.Driver: Uploading application resources to hdfs://analytics-hadoop/user/analytics/.skein/application_1727783536357_691502 24/11/08 15:10:45 INFO skein.Driver: Submitting application... 24/11/08 15:10:45 INFO impl.YarnClientImpl: Submitted application application_1727783536357_691502 application_1727783536357_691502
🎉 https://yarn.wikimedia.org/cluster/app/application_1727783536357_691502
I'm now going to merge https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/887 and see whether we can execute the same, but within a DAG
brouberol merged https://gitlab.wikimedia.org/repos/data-engineering/airflow-dags/-/merge_requests/887
test_k8s: introduce a DAG that runs an idempotent HDFS command via Skein
brouberol opened https://gitlab.wikimedia.org/repos/data-engineering/airflow/-/merge_requests/33
add missing dependency
brouberol merged https://gitlab.wikimedia.org/repos/data-engineering/airflow/-/merge_requests/33
add missing dependency
Change #1088596 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/puppet@production] analytics_test_cluster: enable egress from the dse kubepods network
Change #1088596 merged by Brouberol:
[operations/puppet@production] analytics_test_cluster: enable egress from the dse kubepods network
Change #1088600 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/puppet@production] fix typo in ferm rule
Change #1088600 merged by Brouberol:
[operations/puppet@production] fix typo in ferm rule
I'm going to stop for the week. So far, we've managed to submit a job to YARN via Skein, but the hook fails due to
ValueError: Log aggregation has not completed or is not enabled.
I'll investigate next week.
This is the configuration we have in an-launcher1002 related to yarn logs
brouberol@an-launcher1002:~$ grep -C 2 log /etc/hadoop/conf/yarn-site.xml <property> <description>Where to store container logs.</description> <name>yarn.nodemanager.log-dirs</name> <value></value> </property> -- <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property> -- <property> <description> How long (in seconds) aggregate logs to hdfs for long-running jobs. Without this setting, logs are only aggregated upon job completion, and nodes may run out of space, while jobs are still running. </description> <name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name> <value>3600</value> </property> <property> <description>Where to aggregate logs to.</description> <name>yarn.nodemanager.remote-app-log-dir</name> <value>/var/log/hadoop-yarn/apps</value> </property> <property> <description>What type of compression should be used for yarn logs.</description> <name>yarn.nodemanager.log-aggregation.compression-type</name> <value>gz</value> </property> -- <property> <name>yarn.log-aggregation.retain-seconds</name> <value>5184000</value> <description> How long (in secs) to keep aggregation logs before deleting them. -1 disables. Be careful, if you set this too small you will spam the name node. If yarn.log-aggregation.retain-check-interval-seconds is not set or set to 0 or a negative value (default) then the check interval is one-tenth of the aggregated log retention time. </description> </property> <property> <name>yarn.log-aggregation.retain-check-interval-seconds</name> <value>86400</value> <description> How long to wait between aggregated log retention checks. </description> </property>
We see that we have
yarn.log-aggregation-enable: 'true' yarn.log-aggregation.retain-check-interval-seconds: '86400' yarn.log-aggregation.retain-seconds: '5184000'
in the default values, but we seem to be missing:
- yarn.nodemanager.log-dirs
- yarn.nodemanager.remote-app-log-dir
Change #1090398 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: fix configuration keys containing the analytics-hadoop cluster name
Change #1090399 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: add missing python dependency
Change #1090400 had a related patch set uploaded (by Brouberol; author: Brouberol):
[operations/deployment-charts@master] airflow: enable yarn log aggregation
Change #1090398 merged by jenkins-bot:
[operations/deployment-charts@master] airflow: fix configuration keys containing the analytics-hadoop cluster name
Change #1090399 merged by jenkins-bot:
[operations/deployment-charts@master] airflow: add missing python dependency
Change #1090400 merged by jenkins-bot:
[operations/deployment-charts@master] airflow: enable yarn log aggregation
We now have a DAG that submits a Spark job to YARN via Skein every day: https://airflow-test-k8s.wikimedia.org/dags/run_cmd_via_skein/grid?dag_run_id=manual__2024-11-12T07%3A22%3A36.026021%2B00%3A00&task_id=list_hdfs_files&tab=logs


