Page MenuHomePhabricator

Import data from hdfs to commonswiki_file
Closed, ResolvedPublic3 Estimated Story Points

Description

The SD team is working on a data pipeline to gather data from wikidata that's relevant to files on commons (for the image-suggestions project) and write it to hdfs

This ticket is for the search team to import (weekly) that data in the commonswiki_file search index

Event Timeline

Cparle updated the task description. (Show Details)

We have example data files on hdfs

Initial dataset for Jan /user/cparle/commonswiki_file.full.1648826537
Delta for Feb /user/cparle/commonswiki_file.delta.1648829751

We don't have the airflow job up and running yet, so those filenames and that directory aren't final, but the format is finalised and we're pretty sure that data is good

We have example data files on hdfs

Initial dataset for Jan /user/cparle/commonswiki_file.full.1648826537
Delta for Feb /user/cparle/commonswiki_file.delta.1648829751

File not found for both, but similar formatted paths such as commonswiki_file.full.1648829751 are pretty close. These don't look to be readable from the analytics-search user, might need to ask the analytics team what they suggest to use for file permissions:

 :) (ebernhardson@stat1005)-~$ sudo -u analytics-search kerberos-run-command analytics-search hdfs dfs -ls -d ///user/cparle/commonswiki_file.full.1648829751/*                                                                                 
ls: Permission denied: user=analytics-search, access=READ_EXECUTE, inode="/user/cparle/commonswiki_file.full.1648829751"
:cparle:cparle:drwxr-x---

We don't have the airflow job up and running yet, so those filenames and that directory aren't final, but the format is finalised and we're pretty sure that data is good

One limitation of the current import scripts is they expect everything to be sourced from partitioned hive tables. Typically we partition by a date col of the airflow execution date. Would it take much to arrange these into a partitioned table? Saving to hive partitions might also resolve the permissions issues by way of different defaults, although I'm not entirely sure.

One limitation of the current import scripts is they expect everything to be sourced from partitioned hive tables. Typically we partition by a date col of the airflow execution date. Would it take much to arrange these into a partitioned table? Saving to hive partitions might also resolve the permissions issues by way of different defaults, although I'm not entirely sure.

https://phabricator.wikimedia.org/T307983

Ok so now the data is being written to the tables image_suggestions_search_index_full and image_suggestions_search_index_delta in the hive db analytics_platform_eng. Partitioned by a snapshot column in the format yyyy-mm-dd

search_index_full is all the data for a particular snapshot, search_index_delta is the changes since the last run. For the initial run (snapshot=2022-05-02) they're identical

Is this the same as T305851? Essentially, is there only a single dataset to be imported?

Also not particularly important, but typically if i was going to have two versions of the same dataset (full/delta) i would use an additional partitioning column of a single table, instead of creating a second table. I suppose in practice it doesn't really matter much, but there seems to be stronger guarantees that they have the same shape if they go into the same table.

yeah, basically it's one dataset - we didn't think of it that way at the start, but it turns out the data is the same shape for both so it's all in the same table

... as for _full versus _delta - we ok to go with it the way it is for now?

... as for _full versus _delta - we ok to go with it the way it is for now?

It doesn't hurt anything, can stay as is.

One other problem I'm noticing, and it's also not really a blocker, we can make things work, but the output in hdfs://analytics-hadoop/user/hive/warehouse/analytics_platform_eng.db/image_suggestions_search_index_full/snapshot=2022-05-02/ appears to be 70k files, many of which are only 2kB. Within hadoop there is significant per-file overhead, spreading the dataset out into 70k files makes various operations that should be a few seconds intead take multiple minutes. The entire operation over this dataset ends up taking significant time as it works over this tiny files. The first attempts to process this dataset resulted in yarn killing the driver for running over it's memory allocation, hard to say for sure but seems reasonable to guess that's also related to the high number of individual files/tasks to process. For the moment I'm trying to run it with some higher memory limits to see if it can push through.

As an example, taking a single record from this table through spark ends up taking three minutes as it (i'm guessing) reads metadata from all the different parquet files:

In [6]: now = datetime.datetime.now(); spark.sql('select * from analytics_platform_eng.image_suggestions_search_index_full where snapshot="2022-05-02" limit 1').collect(); took = datetime.datetime.now() - now
Out[6]: [Row(wikiid='commonswiki', page_namespace=6, page_id=115136657, tag='image.linked.from.wikidata.p373', values=['Q64365|192', 'Q6949869|192'], snapshot='2022-05-02')]

In [7]: took
Out[7]: datetime.timedelta(seconds=179, microseconds=720595)

I have no idea why that is ... we're just using df.write.saveAsTable(). Is there any config we can do to improve this?

Hey @EBernhardson, I've also noticed a dramatic increase of execution times when writing output to Hive tables instead of parquets (df.write.parquet).
For instance, the data pipeline task that provides input for this ticket roughly takes 45 mins with a parquet VS 1 hour and 50 mins with a Hive table. It would be ideal if you could adapt your ingestion script to read from parquets.

Ideal except for we'd have to re-rewrite a bunch of code ...

@mfossati I highly doubt the difference in write time is due to hive, hive is only a metadata store. The same parquet files are still written to disk. With the job emitting 70k individual files that seems the thing worth investigating.

I don't know what's under the df.write.saveAsTable hood: perhaps Spark writes those small files with such call? As far as I can tell, the execution times were radically different when we switched to that call

As far as I can tell, the execution times were radically different when we switched to that call

Sure, but what actually changed about the execution times? I suppose my intuition here is that we should be trying to understand how spark works in it's various contexts. From monitoring other spark applications an extra hour in spark could be something as silly as the entire application sitting idle while one worker sequentially processes thousands of NODE_LOCAL tasks just fast enough that spark.locality.wait doesn't kick in and spread them to other nodes. It could be any number of things, and I usually think it's worth understanding.

Sadly we don't run the spark history server which would allow us to get more insight into how this application is running. Could you put the output of yarn logs -applicationId <appId> for a good and bad run somewhere and i can at least pull some stats out about what spark is actually doing?

Change 798996 had a related patch set uploaded (by Ebernhardson; author: Ebernhardson):

[wikimedia/discovery/analytics@master] Support reading parquet paths directly in HivePartition specs

https://gerrit.wikimedia.org/r/798996

Data is currently importing, will take a dozen or so hours to process through the bulk ingestion daemon. Getting it through primarily required increasing executor memory, executor memory overhead, and driver memory until things stopped failing. Once the consumer group lag comes back to 0 everything should have finished importing.

Ok looks like the data has imported correctly, hooray!

I suspect some data is missing from the source tables though, investigating ...

Change 798996 abandoned by Ebernhardson:

[wikimedia/discovery/analytics@master] Support reading parquet paths directly in HivePartition specs

Reason:

seems unnecessary after fixing the execution graph in the downstream app. Can restore if needed later.

https://gerrit.wikimedia.org/r/798996

This can probably be closed now?

CBogen subscribed.

Moving back to in progress since this is currently happening manually on demand, but once working properly it should be automated weekly

regarding scheduling of the job once it lands in airflow, our weekly job always runs with a sunday execution date, if the image-suggestions could also be run on sundaythat would greatly simplify things (otherwise various date math will be involved and it will delay import until the following sunday. I suppose running saturday wouldn't be a big deal). We don't strictly have to be sending the imports sunday, but the weekly data load takes some time to process through the elastic servers and sunday is a generally less-busy time.

We match our snapshot dates with the snapshots in the wmf_raw and structured_data hive tables, and those are generated on Mondays, so I don't think there's any way to avoid date math unless our snapshots had different dates to the source tables, which I think would be confusing.

The weekly snapshots typically don't finish generation until the Friday which means our data is normally ready on the Friday, so I think importing the Sunday after the snapshot generation would be ok? Or maybe if you made a Hive sensor for tomorrow's date (something like "{{ execution_date.add(days=1).format('YYYY-MM-DD') }}") the import could happen as soon as the data was ready ... but then it'd happen on a Friday rather than a Sunday which mightn't suit

Change 820763 had a related patch set uploaded (by DCausse; author: DCausse):

[wikimedia/discovery/analytics@master] Automate imagesuggestion imports

https://gerrit.wikimedia.org/r/820763

Change 820763 merged by jenkins-bot:

[wikimedia/discovery/analytics@master] Automate imagesuggestion imports

https://gerrit.wikimedia.org/r/820763

Mentioned in SAL (#wikimedia-operations) [2022-08-05T16:10:28Z] <dcausse@deploy1002> Started deploy [wikimedia/discovery/analytics@8489923]: T304954: Automate imagesuggestion imports

Mentioned in SAL (#wikimedia-operations) [2022-08-05T16:12:32Z] <dcausse@deploy1002> Finished deploy [wikimedia/discovery/analytics@8489923]: T304954: Automate imagesuggestion imports (duration: 02m 03s)

Gehel subscribed.

Could someone from the Structured Data Engineering team check if the data is available as expected?

From the Airflow Web interface, I can confirm the DAG was successfully executed.

NOTE: the DAG was manually triggered on Friday August 5, and picked up the 2022-07-25 snapshot. I'll monitor scheduled runs as well

@EBernhardson , @dcausse , heads up: I've been checking the Airflow Web interface several times and it seems that the scheduled DAG runs have never started, despite the DAG being active and the well-formed schedule expression.
Can you please look into that?

I'm not sure why airflow decided to not create the dag runs as expected. To get things moving again i deleted the dag state from the instance (delete button in UI) and let airflow re-initialize. I manually marked the 2022-07-25 run as complete and it proceeded to start the 2022-08-01 instance and run it through completion. . The 2022-08-08 instance is also started but is currently waiting for data to arrive in analytics_platform_eng.image_suggestions_search_index_delta/snapshot=2022-08-08.

Updates are currently loading into the search indexes.

I'm not sure why airflow decided to not create the dag runs as expected.

To followup, it looks like airflow really didn't like having a manually created task with the same timestamp as a task that was expecting to be scheduled. The airflow scheduler logs for the last week repeat this error for all dags related to image suggestions (so, also the weekly/hourly transfers):

_mysql_exceptions.IntegrityError: (1062, "Duplicate entry 'image_suggestions_weekly-2022-07-25 00:00:00.000000' for key 'dag_id'")

Deleting the dag state removed the manually invoked instance and allowed this all to progress. The down side is that the system needs to catchup a weeks worth of updates, those are running but will take a day or so to process. In the future we should likely avoid manually triggering dags that are scheduled, alternatively if we had deleted the dag state when changing the schedule that would have also worked.

The likely root cause of all this is that when we first deployed the dag it didn't schedule the expected tasks. We manually triggered the task to get it going, and then realized on review that the problem was we used the @weekly cron alias which runs on sunday, but upstream was using an explicit cron string to execute on monday. The schedule was updated to match upstream but the manually invoked TaskInstance still existed.

Thanks for digging into that and for the extensive notes! I think we can resolve this ticket as soon as T300024: [M] Schedule image suggestions notifications is automated, too (just to make sure the loop is closed).