Page MenuHomePhabricator

[M] Stop unbounded image suggestions dataset growth and clean up legacy results
Open, MediumPublic

Description

The image suggestions dataset models a one-to-many relationship between a page, and suggestion data (keyed by a value that identifies the corresponding compute job). This was done to allow for suggestions data to atomically replace legacy results. Preventing unbounded growth was to be done using TTLs and/or batched range DELETEs during insertion. Neither of those happened, and the dataset has grown to 159GB (it was projected to be 15GB)¹.

We need to cleanup legacy results to reclaim the space (and reduce response size & latency at the gateway service), and put in place some mechanism of managing retention moving forward.


¹ The suggestions table only, and not accounting for replication

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Eevans triaged this task as Medium priority.Sep 9 2022, 1:00 AM
Eevans added a subscriber: mark.

@Cparle let's schedule some time to talk about this next week. This feels rather urgent.
Also, ping @EChetty for this to be on his radar.

From a Slack conversation with @Cparle:

image.png (315×1 px, 62 KB)

I have applied the following to the production cluster:

ALTER TABLE image_suggestions.suggestions WITH default_time_to_live = 1209600;
ALTER TABLE image_suggestions.feedback WITH default_time_to_live = 1209600;
ALTER TABLE image_suggestions.title_cache WITH default_time_to_live = 1209600;
ALTER TABLE image_suggestions.instanceof_cache WITH default_time_to_live = 1209600;

This will limit on-going growth (capping it two weeks from now), but does not address the existing legacy results.

To remove the existing legacy results we have 2 options:

Option 1:
We still have the data used to populate the Cassandra tables in Hive. We can write a script to read the Hive tables and send batch DELETE queries to Cassandra

Option 2:
We could write the next iteration of data to a different keyspace, and once it's written change the production api to read from that. After the change we could delete the old keyspace. This would be quicker, but afaics the keyspace justs read from the request, and the api gateway is so thin that there is no simple way to map it to a different one

Any thoughts @Eevans ?

To remove the existing legacy results we have 2 options:

Option 1:
We still have the data used to populate the Cassandra tables in Hive. We can write a script to read the Hive tables and send batch DELETE queries to Cassandra

Out of curiosity, why a batch? If this is about efficiency, there isn't much to be had from doing so (this isn't what batches are intended to solve), and it creates the added complexity of ensuring that BATCH statements aren't sized over the limit.

A similar approach would be to create a script that iterates the rows/partitions in Cassandra, and issues a DELETE FROM <table> WHERE wiki = <wiki> AND page_id = <page ID> AND id < maxTimeuuid(<date>) for each partition (where <date> is something strictly less than the most recent).

We could either obtain the partitions ((wiki, page_id)) via Hive, or a SELECT DISTINCT wiki, page_id FROM <table> would probably work as well.

This needs to take reasonable care not to overwhelm the cluster; The easiest safeguard would be to simply use no concurrency (it doesn't need to be performant anyway).

Option 2:
We could write the next iteration of data to a different keyspace, and once it's written change the production api to read from that. After the change we could delete the old keyspace. This would be quicker, but afaics the keyspace justs read from the request, and the api gateway is so thin that there is no simple way to map it to a different one

Any thoughts @Eevans ?

This has a tad too many moving parts for my sensibilities.

If it's not too disruptive on your end though, we could TRUNCATE the tables and then reimport (not sure how long that takes). This is quick, easy, and safe, but of course as the import runs, users will see no suggestions.

kostajh renamed this task from Image suggestions dataset is growing unbounded to Stop unbounded image suggestions dataset growth and clean up legacy results.Sep 16 2022, 9:00 AM

If it's not too disruptive on your end though, we could TRUNCATE the tables and then reimport (not sure how long that takes). This is quick, easy, and safe, but of course as the import runs, users will see no suggestions.

We (Growth-Team) would like to avoid this, if possible.

I had guessed we'd want a batch to ensure data consistency (in case the Option 1 script fails) but seeing as the most recent data is really all we care about, then I guess it doesn't matter and we can do a one-off script with no batching.

If there are no objections then we can go ahead and do this

CBogen renamed this task from Stop unbounded image suggestions dataset growth and clean up legacy results to [M] Stop unbounded image suggestions dataset growth and clean up legacy results.Sep 21 2022, 4:23 PM

I had guessed we'd want a batch to ensure data consistency (in case the Option 1 script fails) but seeing as the most recent data is really all we care about, then I guess it doesn't matter and we can do a one-off script with no batching.

If there are no objections then we can go ahead and do this

No objections here; If you need help and/or review, please let me know.

Deletion has been done this morning using the following actions:

# On stat machine, in a screen, launch a spark-shell
spark3-shell  --master yarn --executor-memory 8G   --executor-cores 2   --driver-memory 8G   --driver-cores 2  --conf spark.dynamicAllocation.maxExecutors=90 --jars hdfs:///wmf/cache/artifacts/airflow/analytics/spark-cassandra-connector-assembly-3.2.0-WMF-1.jar --conf spark.cassandra.connection.host=aqs1010-a.eqiad.wmnet:9042 --conf spark.cassandra.auth.username=aqsloader --conf spark.cassandra.auth.password=cassandra

# Then execute scala code

import com.datastax.spark.connector._

val r = spark.sql("""
SELECT DISTINCT
  -- Selecting C* primary key columns (except for image, not needed)
  wiki, page_id, id
FROM analytics_platform_eng.image_suggestions_suggestions
WHERE
  -- Snapshot list extracted from  /user/hive/warehouse/analytics_platform_eng.db/image_suggestions_suggestions
  -- Keeping snapshot that are before Sept 9th as after that date data normally has TTL
  snapshot IN ('2022-05-02', '2022-05-16', '2022-06-13', '2022-06-20', '2022-06-27', '2022-07-04', '2022-07-11',
               '2022-07-18', '2022-07-25', '2022-08-01', '2022-08-08', '2022-08-15', '2022-08-22', '2022-08-29', '2022-09-05')
""").repartition(6).rdd

val f = new RDDFunctions(r)

f.deleteFromCassandra("image_suggestions", "suggestions", keyColumns = SomeColumns("wiki", "page_id", "id"))

The job ran in about 1h, with about 15 minutes of data preparation and about 45 minutes of cassandra deletion. Then number of sent deletions were about 71milion (more rows should have been be affected as for every (wiki, page_id, id) there can be multiple images).

As for disk space, we should see changes in at least 10 days (probably more), as cassandra has a grace period for tombstoned rows of 10 days (the default value, that we use), and after that applies deletion at compaction, when data is rewritten, which happens when there are data changes (for us, when we load a new version of the dataset).

So I think we're probably ready to close this now if that's ok with @kostajh

Deletion has been done this morning using the following actions:

# On stat machine, in a screen, launch a spark-shell
spark3-shell  --master yarn --executor-memory 8G   --executor-cores 2   --driver-memory 8G   --driver-cores 2  --conf spark.dynamicAllocation.maxExecutors=90 --jars hdfs:///wmf/cache/artifacts/airflow/analytics/spark-cassandra-connector-assembly-3.2.0-WMF-1.jar --conf spark.cassandra.connection.host=aqs1010-a.eqiad.wmnet:9042 --conf spark.cassandra.auth.username=aqsloader --conf spark.cassandra.auth.password=cassandra

# Then execute scala code

import com.datastax.spark.connector._

val r = spark.sql("""
SELECT DISTINCT
  -- Selecting C* primary key columns (except for image, not needed)
  wiki, page_id, id
FROM analytics_platform_eng.image_suggestions_suggestions
WHERE
  -- Snapshot list extracted from  /user/hive/warehouse/analytics_platform_eng.db/image_suggestions_suggestions
  -- Keeping snapshot that are before Sept 9th as after that date data normally has TTL
  snapshot IN ('2022-05-02', '2022-05-16', '2022-06-13', '2022-06-20', '2022-06-27', '2022-07-04', '2022-07-11',
               '2022-07-18', '2022-07-25', '2022-08-01', '2022-08-08', '2022-08-15', '2022-08-22', '2022-08-29', '2022-09-05')
""").repartition(6).rdd

val f = new RDDFunctions(r)

f.deleteFromCassandra("image_suggestions", "suggestions", keyColumns = SomeColumns("wiki", "page_id", "id"))

I'm not sure this worked...

In randomly sampling records, I'm seeing a lot of results that predate Sept. 9. For example:

SELECT wiki, page_id, id, dateOf(id) FROM image_suggestions.suggestions WHERE wiki = 'dewiki' AND page_id = 7580886;

Is returning records from 2022-04-21, 2022-05-31, 2022-06-22, 2022-06-27, 2022-07-29, 2022-08-19, 2022-08-29, and 2022-09-29 (to be expected). Most of those are from dates not a part of your IN, but at least one (2022-06-29) is.

I think the deletion has worked for the data that was present in the hive analytics_platform_eng.image_suggestions_suggestions table for the given snapshots. I have checked snapshot 2022-06-27 and 2022-08-29 data in hive, and the ids don't match the cassandra ones (there must have been two loading jobs those days).

# In cassandra: 

dewiki | 7580886 | 89b8395c-f5b2-11ec-b0b9-bc97e15833ae | 2022-06-27 00:46:10.517000+0000

dewiki | 7580886 | 0ffef42c-27b0-11ed-b6fc-bc97e15a9f12 | 2022-08-29 15:34:25.444000+0000


# in spark 

spark.sql("""
SELECT
  wiki,
  page_id,
  id,
  COUNT(1) as nb_images
FROM analytics_platform_eng.image_suggestions_suggestions
WHERE snapshot = '2022-06-27'
  AND wiki = 'dewiki'
  AND page_id = 7580886
GROUP BY
  wiki,
  page_id,
  id
""").show(10, false)

+------+-------+------------------------------------+---------+                 
|wiki  |page_id|id                                  |nb_images|
+------+-------+------------------------------------+---------+
|dewiki|7580886|b8d0bfee-013c-11ed-80a7-f4e9d4db8760|41       |
+------+-------+------------------------------------+---------+


spark.sql("""
SELECT
  wiki,
  page_id,
  id,
  COUNT(1) as nb_images
FROM analytics_platform_eng.image_suggestions_suggestions
WHERE snapshot = '2022-08-29'
  AND wiki = 'dewiki'
  AND page_id = 7580886
GROUP BY
  wiki,
  page_id,
  id
""").show(10, false)

+------+-------+------------------------------------+---------+                 
|wiki  |page_id|id                                  |nb_images|
+------+-------+------------------------------------+---------+
|dewiki|7580886|261a03e2-2f2a-11ed-9e77-bc97e1541488|41       |
+------+-------+------------------------------------+---------+

Now that being said, it means that the original assumption that the hive contained all loaded data was wrong.

Aha yes indeed you are right @JAllemandou ... we've had bugs in the data pipeline at various times, and deleted data from hive before re-running it without considering that that would orphan data in Cassandra

Uurgh this complicates things now. I'm pretty sure we haven't done this since August, but I guess we'll need to consider how to delete old Cassandra data from before then. Is it possible to do a range-based delete with deleteFromCassandra()? Would something like this work?

f.deleteFromCassandra("image_suggestions", "suggestions", keyColumns = SomeColumns("wiki", "page_id", "dateOf(id)<'2022-09-06'"))

edit: I realise that the dateOf() part above isn't correct for a delete, it's just pseudocode (and I'm not sure how if it's even possible to do something like this)

Is it possible to do a range-based delete with deleteFromCassandra()?

Nope, not possible - the connector doesn't support range-deletion.

hmmm ok ... so I guess that means we'll have to do a row-by-row thing like select id where wiki='x' and page_id='y' and dateOf(id)<'z' then a corresponding delete for each result? Is that possible?

I'd for a solution like:

  • Extract all (wiki, page_id) pairs ever written to cassandra from the hive data we have - My assumption is that it'll cover most primary-keys in cassandra.
SELECT DISTINCT wiki, page_id FROM analytics_platform_eng.image_suggestions_suggestions
  • Loop over the pairs sending delete statement to cassandra with a fixed range query
DELETE FROM image_suggestions.suggestions WHERE wiki = '${wiki}' AND page_id = ${page_id} and id < minTimeuuid('2022-09-10')

Ok! Done at last!

Unillustrated articles with recent suggestions still have suggestions, e.g.

Articles with older suggestions do not, e.g.

Cparle reopened this task as Open.

... actually reopening to allow @kostajh to confirm

Thanks a lot @Cparle for pushing this to the end :)

... actually reopening to allow @kostajh to confirm

Seems OK to me! Thanks @Cparle.

@Cparle are you able to post the code you used for this? Just spot checking I found some older entries.

cassandra@cqlsh> SELECT wiki,page_id,dateof(id) FROM image_suggestions.suggestions WHERE wiki = 'bgwiki' AND page_id = 506962;

 wiki   | page_id | system.dateof(id)
--------+---------+---------------------------------
 bgwiki |  506962 | 2022-04-12 09:10:26.505000+0000
 bgwiki |  506962 | 2022-04-12 09:10:26.505000+0000
 bgwiki |  506962 | 2022-04-21 10:26:53.793000+0000
 bgwiki |  506962 | 2022-04-21 10:26:53.793000+0000

(4 rows)
cassandra@cqlsh>

Sure. First I did this in a notebook

all = spark.sql('SELECT wiki, page_id FROM analytics_platform_eng.image_suggestions_suggestions').distinct().coalesce(1)
all.write.csv('all_page_with_suggestions_20221027.csv')

... then copied the output to the local filesystem on stat1008 and ran this in python

import csv
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username='aqsloader', password='*******')
cluster = Cluster(['aqs1010-a.eqiad.wmnet'], auth_provider=auth_provider)
session = cluster.connect('image_suggestions')


with open('all_page_with_suggestions_20221027.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    for row in csv_reader:
        cql = "DELETE FROM image_suggestions.suggestions WHERE wiki = '{}' AND page_id = {} and id < minTimeuuid('2022-10-24')".format(row[0], row[1])
        session.execute(cql)

It looks like we don't have any data in hive for wiki=bgwiki, page_id=506962. When we were developing the data pipeline and running it for snapshot=X we deleted "wrong" data from hive when we spotted something wrong and re-ran the pipeline for the same snapshot, which means we have orphaned data in Cassandra (obvs I realise now that we shouldn't have done this, or at least we should have cleaned up Cassandra first). I don't think we can do a query in Cassandra based solely on id, so I don't really have any ideas for how we might find this data and get rid of it. Any ideas welcome @Eevans

... and I've realised now that we also have old data in the instanceof_cache and title_cache tables that we ought to get rid of. Will need to think about how we might do that

Actually I might create another ticket for that if it's ok with everyone? I don't *think* it's causing an actual problem atm, so it'd be lower priority

New ticket for cleaning up the other tables T323561

I'll close this if there are no objections @Eevans @kostajh @JAllemandou

Actually, if we could press pause on this @Cparle ...

I've been trying to do a SELECT DISTINCT ... on image_suggestions.suggestions, to get a full list of partition keys in use. My thought was that we could do a set diff between the keys you used on your cleanup run and find the orphaned results. My attempts have thus far been failing on various (unknown) partitions, the result of timeouts. I'm not sure why, but I suspect that these are the partitions in question, and that they are very wide (so wide that the coordinator times out before it can collate the result). I'd like to a) find these and clean them up, and b) understand what happened to get us to this state.

I assume, that whatever process loaded data into the suggestions table, also loaded into instanceof_cache and title_cache, is this assumption safe? If so, a SELECT DISTINCT ... there should succeed on them and would contain the same results that I've failed to get from suggestions.

I assume, that whatever process loaded data into the suggestions table, also loaded into instanceof_cache and title_cache, is this assumption safe?

Yes, I think so ... can't be 100% sure because that process might not have always been working from complete or consistent datasets when we were developing the thing, but it definitely seems like an approach that has a good chance of working

I assume, that whatever process loaded data into the suggestions table, also loaded into instanceof_cache and title_cache, is this assumption safe?

Yes, I think so ... can't be 100% sure because that process might not have always been working from complete or consistent datasets when we were developing the thing, but it definitely seems like an approach that has a good chance of working

Ok, I have output for both tables (wiki & page_id); Where can I get a copy of all_page_with_suggestions_20221027.csv?

It's in /home/cparle on stat1008, also in hdfs:///user/cparle/all_page_with_suggestions_20221027.csv

@Eevans ... I won't close this if you're still working on it, but I might take it off our board if that's ok?

@Eevans ... I won't close this if you're still working on it, but I might take it off our board if that's ok?

Sure.

Eevans edited subscribers, added: KOfori; removed: mark, WDoranWMF.

Reopening.

We still have (an unknown) quantity of legacy data stored here, with no TTL (i.e. it isn't going anywhere until it is manually deleted). Additionally, there are some unresolved issues from my first attempt at querying this data so that it could be deleted, that need to be understood (abnormally wide partitions?)

I don't think we can close this quite yet.

Reopening.

We still have (an unknown) quantity of legacy data stored here, with no TTL (i.e. it isn't going anywhere until it is manually deleted). Additionally, there are some unresolved issues from my first attempt at querying this data so that it could be deleted, that need to be understood (abnormally wide partitions?)

I don't think we can close this quite yet.

@Eevans - my mistake, sorry. I assumed that the only part that's left to do is evaluating if there are are no impact on user experience (just in case), which is what I was monitoring/checking.