Page MenuHomePhabricator

Make wmfdata work with Kerberos
Closed, ResolvedPublic

Description

Now that Analytics has rolled Kerberos out to the Hadoop cluster, wmfdata seems to have stopped working (even after the kinit command is run in the Jupyter terminal). It's not yet clear why.

Event Timeline

nshahquinn-wmf created this task.
kzimmerman subscribed.

Re-opening because we're running into new issues; @nettrom_WMF is digging into this today

From our discussions, the best option moving forward appears to be to switch the backend from Hive to Spark. With thanks to @Ottomata, there's now a pull request that does this. Before merging it, we'll need some testing. From @Neil_P._Quinn_WMF, the following things need to be tested:

  • Does the non-Pandas format produced by resultDf.collect match the previous format?
  • What happens if you try to run DDL queries through it (CREATE TABLE, INSERT … SELECT, etc.)?
  • Related: does hive.load_csv still work?
  • Does the function need to do any closing of the Spark session to avoid the sessions living on and taking up cluster resources indefinitely?

I ran some tests on notebook1004 and notebook1003. From what I can tell, the behavior of the library is the same in both cases. I did run into a couple of issues during installation/use that were no related to queries or data formats. I'll document those below.

  • Does the non-Pandas format produced by resultDf.collect match the previous format?

Yes and no. The old format was a list of tuples, the new format is a list of named tuples of type Row. That means code expecting the old format should work just fine, while moving forward access to column values can also be done through properties (e.g. result[0].column_name).

  • What happens if you try to run DDL queries through it (CREATE TABLE, INSERT … SELECT, etc.)?

They have the expected side-effect of creating/populating tables, but will now return an empty data frame due to the call to collect()/toPandas(). We can later decide if we want it to return nothing in those cases.

  • Related: does hive.load_csv still work?

Yes, appears to work just fine.

  • Does the function need to do any closing of the Spark session to avoid the sessions living on and taking up cluster resources indefinitely?

As far as I understand, no, but I'm hoping @Ottomata (or someone else from Analytics) will pitch in here. The code runs with a "local" master, which from what I've been able to find means it runs locally on whatever machine you're using. Unless you switch to having yarn as a master, it'll only use resources while the query is running.

As mentioned above, I ran into a couple of issues when trying to install the patched code with a fresh virtual environment:

  1. wmfdata tries to install matplotlib-3.2.0rc1, which fails with an error due to jquery-ui. The way I solved this was to install matplotlib itself first (using the latest version), which bundles jquery-ui in a different way and installed just fine. Then I could install the patched wmfdata library without a problem.
  2. We'll need to add findspark to the list of requirements, as otherwise it won't be installed and the import fails.

I'll merge the pull request so we can test among the team more broadly and see if there are outstanding issues (apart from one we know about, that the current code doesn't support multiple queries).

I'll be following up on this after the holidays. While switching to the Spark backend does appear to work, it also has its own quirks. Some of the queries used in Homepage reports are slow, and querying mediawiki_history was also problematic. Here, "slow" means not finishing in several hours, which is a significant performance decrease compared to what we had with Hive. There might be configuration parameters that can alleviate this and I've had success with some, but want to discuss them with Analytics folks to understand tradeoffs in a shared environment. Now that we also have @mpopov's Hive CLI solution, we should consider the benefits and drawbacks of all these and what to do moving forward.

I would just like to add that my reporting is blocked on issues that I think are in this task. I've been working with @nettrom_WMF on this, but thought I'd also add my voice. This is of pretty high urgency for me, as our team needs reporting for upcoming meetings with leadership.

I run reporting for Growth team features in a Python notebook that uses the wmfdata package. Since the kerberos changes, my queries take at least an hour to run, whereas they used to take about 5 minutes each. In fact, I have not been able to get results out of the notebook because the queries have not finished before the notebook loses its connection or my laptop goes to sleep.

So in summary, it seems like there have been prohibitive speed decreases with the kerberos change.

Please let me know if I'm posting to the right place or how to get this solved.

The Kerberos change on its own shouldn't cause query speed decreases like that. Something else must be going wrong.
Can you give us some more info? Which notebook file on which notebook host? Or paste the code here?

Are others experiencing similar problems?

The Kerberos change on its own shouldn't cause query speed decreases like that. Something else must be going wrong.
Can you give us some more info? Which notebook file on which notebook host? Or paste the code here?

Are others experiencing similar problems?

I believe the issue is that the new code runs queries locally by default rather than by using YARN. I'm inclined to change it to use YARN by default, but to do that we will have to figure out how to prevent those sessions from being left open when they're no longer needed. Any suggestions, @Ottomata?

New code meaning with Spark by default instead of Hive client?

I believe that the Spark job will quit when the script exits. This is different in Notebooks, Jupyter maintains the Spark session as long as the notebook kernel is running (even if the browser window is closed).

New code meaning with Spark by default instead of Hive client?

Yes, by new code I mean the stuff that's using Spark in place of Impyla :)

I believe that the Spark job will quit when the script exits. This is different in Notebooks, Jupyter maintains the Spark session as long as the notebook kernel is running (even if the browser window is closed).

We mainly use this package in Jupyter notebooks, actually. So, since the spark.get_spark_session code you added uses builder.getOrCreate, I assume that after the first run of hive.run in a notebook, the code will be just picking up an existing Spark session. So it seems like those sessions will be staying open at least as long as the notebook is open (which for me can be weeks with local notebooks) and maybe even longer; do the Spark kernels have logic that kills the Spark session when the notebook is closed, because if so, there's nothing currently in wmfdata that replicates that behavior.

Notebooks use 'YARN client mode', which means the Spark master process is local, but the executors run in YARN. When the notebook kernel is closed (not just browser closed, the kernel has to actually be closed), the master process is lost and YARN deallocates any executors is currently has. If the Spark master process (in a notebook or anywhere) runs for a long time without using any executors, the executor resources will eventually be reclaimed by YARN, but the Spark master process will still be running and registered with YARN (even if not actually running there).

Anyway, in that case it would be good to close the Spark master process too. I haven't tried this myself, but you should be able to just call the SparkSession stop method (spark.stop()) to stop it. I haven't tested how this would work in wmfdata. If your code plans to issue a series of SQL queries, it'd be best not to call stop after each one. Perhaps wmfdata could call stop in a timeout? If no query has been issued in 10? minutes, call spark.stop()?

@Ottomata @Neil_P._Quinn_WMF -- your conversation is over my head, but I'll tell you what I know. I'm using a notebook written for me by @nettrom_WMF, and I just do things like copy/paste code and modify the SQL. It is located at notebook1004.eqiad.wmnet:/user/mmiller/notebooks/homepage_reporting-2019-12-23.ipynb.

The first slow-running cell is the 8th code cell, which starts with

homepage_data = hive.run(
    homepage_data_query.format(

In chat I just advised Marshall to run his hive.run queries with spark_master='yarn'. This works, but a caveat is that if you've already done hive.run in a running notebook kernel, each subsequent hive.run will re-use the spark session. He has to close and halt the notebook, re-open it, and then re-issue the hive.run statements with spark_master='yarn'.

This is a gotcha for the spark_master (and any other spark configs) that hive.run will take. It's good because it prevents each hive.run statement from re-instantiating a new SparkSession every time it is called, but bad in that you can't vary the SparkSession configuration between queries unless it is explicitly stopped (spark.stop()) between them.

Notebooks use 'YARN client mode', which means the Spark master process is local, but the executors run in YARN. When the notebook kernel is closed (not just browser closed, the kernel has to actually be closed), the master process is lost and YARN deallocates any executors is currently has. If the Spark master process (in a notebook or anywhere) runs for a long time without using any executors, the executor resources will eventually be reclaimed by YARN, but the Spark master process will still be running and registered with YARN (even if not actually running there).

Ahh, so it sounds like leaving an idle Spark session running on one of notebook servers doesn't actually use up any significantcluster resources? So, even though we don't want to have tons of idle sessions built up, the impact isn't any worse than leaving notebook kernels running for long periods, right?

Anyway, in that case it would be good to close the Spark master process too. I haven't tried this myself, but you should be able to just call the SparkSession stop method (spark.stop()) to stop it. I haven't tested how this would work in wmfdata. If your code plans to issue a series of SQL queries, it'd be best not to call stop after each one. Perhaps wmfdata could call stop in a timeout? If no query has been issued in 10? minutes, call spark.stop()?

Hmm, yeah, a timeout would be good, although I'm not sure how to implement it yet. I may just start without one in the interest of unblocking people and then work on that later.

IIUC, YARN will still hold open an application slot while your master is connected. (see all the pyspark-shells at https://yarn.wikimedia.org/cluster/scheduler). It'd be nice if these were closed when not in use.

IIUC, YARN will still hold open an application slot while your master is connected. (see all the pyspark-shells at https://yarn.wikimedia.org/cluster/scheduler). It'd be nice if these were closed when not in use.

I've been reading up on this, so tell me if this summary is correct. By default, our Spark applications use dynamic resource allocation, so executors are released when applications are idle. This reduces the cost of idle applications significantly, but it doesn't eliminate it because each application has an Application Master (I think this is what you mean by an application slot) which occupies a single container on the cluster for the application's entire lifetime.

The Application Master dies whenever the driver program does; on SWAP, since we use yarn-client mode, these are in the notebook kernels, so all resources will eventually be released when the kernel is shut down. However, kernels can and do stay open for weeks or more, and we aren't relying on the Spark application to hold any context between calls to hive.run, we should impose a much shorter lifetime on these applications.

Correct! :)

Cool! In that case, I'm thinking about changing hive.run so that, just before returning, it starts a threading.Timer that will stop the Spark session after some time (perhaps 30 minutes), and that, just after starting, it will cancel the running timer if it exists. I will make the timer a global variable so that if we add any other functions that use the Spark session, they can interact with the same timer.

Can you think of any better approach?

Given the above suggestion, I propose a longer timer.
I stop the spark session at the end of the day or when I finish the task...but I have had periods where I need to spend say 30mins reading documentation to address an issue before proceeding with the task at hand on the spark session...so could use a longer timer.

Sounds good! Got 2 improvements to the idea:

  • Reset the timer every time hive.run is called.
  • Keep a cache of distinct spark_config to SparkSession instances, and time them out individually. This would avoid confusion in case hive.run is called two different times with different spark_config values. Right now, only the first instantiate SparkSession (with the first provided spark_config) will be used for every hive.run call.

@Iflorez the hive.run call will still work after 30 mins; it will just have to launch a new SparkSession. This is already better than when hive.run used the Hive API. With direct hive queries, every hive.run() call results distinct Hadoop MapReduce Hive job. The 30 minute timeout just avoids having to re-launch the Spark job in Hadoop.

I see, thank you. Your clarification is helpful.

Okay, I've put up a pull request that:

  • stops Spark sessions after an hour of inactivity
  • improves the pre-query Kerberos credentials check
  • restores the ability to pass multiple commands to hive.run

I previously switched the default spark_master to yarn, so I think this takes care of all the high-priority issues caused by the Kerberos migration.

@Ottomata, @mpopov please review it and merge if appropriate! (@Ottomata, I've invited you to the repo as a collaborator)

Once we get this merged, we can close this!

Added 2 comments but looks good to me!

Okay, I've merged the new code and emailed out an announcement. Thank you for the review, @Ottomata!