Page MenuHomePhabricator

New database request: image_matching
Closed, ResolvedPublic

Description

Hi,

With this task I'd like to start a discussion around creating a new database to support the Image Suggestion API service.

Image Matching

The context is a cross-team project to suggest images to illustrate Wikipedia articles.

Platform Engineering developed an API and a data pipeline that coordinates the generation of an heuristic (expert-based) model, and publishes datasets (lookup tables) to analytics.wikimedia.org/published. We are considering Swift for production use. Currently the API is hosted on Toolforge VMs, and loads data locally to an in-memory sqlite database.

The project is currently in a PoC phase, but PET will develop and support it once it matures to production. For production, we would like to ingest these datasets into a database, so that the API will be able to consume recommendations from WMF's prod kube cluster.

Data Model

We expect a read-heavy load, with periodic (monthly) high-throughput writes. The data pipeline will publish a static dataset that represents state at model training time. The database will not be updated during its lifetime (e.g. in between training schedules).
Keeping track of state changes will be addressed by a dedicated component and caching mechanism.

  • Number of records:: 15.000.000 (PoC) to TBD (prod).
  • Read access pattern: a lookup of (wiki, page_id). The result set can contain k records (initially k = 3).
  • Write access pattern: periodic (monthly) high throughput write load. Old records can be discarded, and the dataset refreshed.
  • Updates: data can be discarded and replaced once a new dataset is available (monthly)
  • QPS (reads): the range is millions down to 1000s daily requests. The primary clients are android and bot writers. We expect Bot to have bursts of activity.
  • Data volumes: 1.5GB (PoC) to the order of 10s of GBs (prod - estimate)
  • Data model assumptions:
    • Data is not a time series (conceptually it’s a single datapoint of a time series).
    • Data does not depend on previous state / no need to track history.
    • The dataset is denormalized and stored in a single schema.
    • We’ll only perform lookups. No range or analytical queries, joins, etc.
    • We lookup for a (page, wik), and should return multiple recommendations.
Dataset updates

To meet SLAs, we want to keep providing recommendations while the datasets are refreshed. Currently we are considering
a snapshot strategy: loading the new snapshot should be done while serving data from the existing one,
and once loading is finished and vetted, then the serving layer swaps storage snapshot,
and the old one can be deleted (after some time possibly, allowing for rollback). This allows for easy rollback, at the cost of datasize.
We need to validate the implications of this approach on replication strategies applied on the db hosts.

Example

A sample sqlite DDL we use for development and testing can be found at https://gist.github.com/gmodena/8ffd0ecc2051703bb0d6391d85c44879

Access from service

The API should access the database from kubernetes*.<eqiad|codfw>.wmnet, kubestage*.<eqiad|codfw>.wmnet,.
We will need access from a host outside Hadoop VLANs that coordinates ingestion (TBD).

Backup policy

We probably won't need backups. Data can be re-generated from Hadoop if needed.

Privacy

The data we wish to store has been approved by the Privacy Review Process.
Related phab: https://phabricator.wikimedia.org/T273434#6825118

SLO

Work In Progress.

Desires

Currently we identified the following desirable properties:

  • We would like dedicated hosts outside the "misc" pool.
  • We would like the ingestion process to be automated.
  • We would like to support high throughput writes without aggressive throttling and batch sizes.
  • We would like no API downtime during writes.
  • We would like to (re) index the dataset schemas according to the desired read access pattern (index by wiki, page_id).
  • We would to keep store copies (possibly variants) of the datasets.
  • (Maybe) we would like to store support tables for the API.
  • (Maybe) we would like to consolidate data pipelines, and migrate similarusers to this database (hosts).

Event Timeline

LSobanski triaged this task as Medium priority.
LSobanski moved this task from Triage to Pending comment on the DBA board.
Marostegui moved this task from Pending comment to Refine on the DBA board.
Marostegui subscribed.

Thanks for the detailed submmit @gmodena - some questions:

  • Is MySQL the best place to store this materialized data? Have you considered hadoop perhaps?
  • How many wikis will this be enabled on if the PoC is ok?
  • What wiki will have the PoC?

Data volumes: 1.5GB (PoC) to the order of 10s of GBs (prod - estimate)

Do you have a way to narrow this a bit more? I am especially interested on the production estimations if the PoC is successful.

We would like dedicated hosts outside the "misc" pool.

Why do you want specific set of hosts for this? You need to consider that the absolute minimum of hosts we'd need to buy for this would be 4 (2 per DC, so we can have HA). We'd need to allocate budget for this during the next FY.
I would like to understand the reasoning for requesting specific HW, as we have some misc hosts that can accept more load, and having at least 4 more hosts in our infra is of course, another SPOF, and 4 more hosts we'd need to manage just for a single feature.

We would like no API downtime during writes

That's impossible to ensure and same for the DB status, it is impossible to guarantee 100% uptime. In case the host goes down, what are the implications? How would the feature handle the database going dow at any time, during the write process or outside of it
From what you've described, it looks like reads and writes would be done from the same host or would the feature have the concept of master/slave? (writes going to the master and reads to the slave?). How would this escale if we find that a single host cannot cope with the load of writes and reads? Will the feature be able split reads and writes?

This comes because of this comment:

We would like to support high throughput writes without aggressive throttling and batch sizes.

If the feature is going to be reading from the slaves, throttling will definitely be needed to avoid lag on the slaves.

QPS (reads): the range is millions down to 1000s daily requests. The primary clients are android and bot writers. We expect Bot to have bursts of activity.

Is this million rows or million statements (with presumably a bunch of rows per read?)

Again, if we are talking about a single host doing writes and reads, million of reads per day needs to carefully be thought.

Let's keep chatting here - thanks again for the detailed submission, this is a good start to polish the requirements!

Hey @Marostegui,

Thanks for detailed reply and constructive feedback.

  • Is MySQL the best place to store this materialized data? Have you considered hadoop perhaps?

I would very much appreciate your input here.
We considered Hadoop - the AQS Cassandra that lives in that realm - but, after consulting with stakeholders, it was not deemed ready yet for onboarding this use case.

I don't know if MySQL cluster is the best place, but it has been recommended as a canonical storage solution for production services.

We do have an internal (PET) Cassandra cluster (RESTBase), which might be a fit for the ETL workload and access pattern.

If the outcome of this task is that other systems are more suitable to our needs than MySQL, that'd be absolutely fine. Right now we
don't have many assumptions / deps on db technology, and we are very much looking for input and best practices.

  • How many wikis will this be enabled on if the PoC is ok?

Client teams would like to support all 300+ wikis. This should be the comprehensive list: https://github.com/mirrys/ImageMatching/blob/0d5aa1c99905ea2f3a3fa830e0a66c3079377a89/conf/wiki.conf#L2

  • What wiki will have the PoC?

The initial 24 wikis are the following: https://github.com/mirrys/ImageMatching/blob/0d5aa1c99905ea2f3a3fa830e0a66c3079377a89/conf/wiki.conf#L5

Data volumes: 1.5GB (PoC) to the order of 10s of GBs (prod - estimate)

Do you have a way to narrow this a bit more? I am especially interested on the production estimations if the PoC is successful.

We are in the process of running the pipeline on all wikis and get some stats. I'll update the task once that info is available.

We would like dedicated hosts outside the "misc" pool.

Why do you want specific set of hosts for this? You need to consider that the absolute minimum of hosts we'd need to buy for this would be 4 (2 per DC, so we can have HA). We'd need to allocate budget for this during the next FY.
I would like to understand the reasoning for requesting specific HW, as we have some misc hosts that can accept more load, and having at least 4 more hosts in our infra is of course, another SPOF, and 4 more hosts we'd need to manage just for a single feature.

We have a number of services that follow common a design: periodic high write throughput + lookups on generated datasets). Similarusers is one such service we already deployed in "misc", and IIRC that was a temporary solution and we'd like to move it out. We expect more of these services to come to our backlog in the next FY.

We would like no API downtime during writes

That's impossible to ensure and same for the DB status, it is impossible to guarantee 100% uptime.

My apologies, I misphrased that statement. What I meant is "minimal downtime", it is understood that 100% uptime is not realistic (the service respects the laws of physics :)). We don't have an SLA yet for the desired value of "minimal", and we hope to collect metric during PoC to help us reason about constraints.

In case the host goes down, what are the implications? How would the feature handle the database going dow at any time, during the write process or outside of it
From what you've described, it looks like reads and writes would be done from the same host or would the feature have the concept of master/slave? (writes going to the master and reads to the slave?). How would this escale if we find that a single host cannot cope with the load of writes and reads? Will the feature be able split reads and writes?

Functionally we would like to be able to read and write concurrently, with the assumption that writes happen in bulk once a month. We have not been explicit with this requirement implementation, but a master/slave setting would be ideal.
I understand this would impact hw requirements as you describe above. Would the misc pool resources be able to support this load/pattern?

In the feature, read and write logic are separate. A services hosted on k8 will read, and an external process will write.
For writes, we can (gracefully) deal with dbs (or the pipeline machinery) going down and recover from it. It's something we'll capture in the service SLO.

cc / @BPirkle @nnikkhoui for service & API considerations.

This comes because of this comment:

We would like to support high throughput writes without aggressive throttling and batch sizes.

If the feature is going to be reading from the slaves, throttling will definitely be needed to avoid lag on the slaves.

To clarify: some degree of throttling is fine/expected, but hopefully we could find a threshold that does not delay data delivery too much if volumes grow.

QPS (reads): the range is millions down to 1000s daily requests. The primary clients are android and bot writers. We expect Bot to have bursts of activity.

Is this million rows or million statements (with presumably a bunch of rows per read?)

QPS are million rows, which could be bunched into statements.

Again, if we are talking about a single host doing writes and reads, million of reads per day needs to carefully be thought.

In a single host setting, R/W contention would only happen when a new dataset is ingested (monthly), but it would be challenging nonetheless.

Let's keep chatting here - thanks again for the detailed submission, this is a good start to polish the requirements!

Happy to discuss further!

Hey @Marostegui,

Thanks for detailed reply and constructive feedback.

  • Is MySQL the best place to store this materialized data? Have you considered hadoop perhaps?

I would very much appreciate your input here.
We considered Hadoop - the AQS Cassandra that lives in that realm - but, after consulting with stakeholders, it was not deemed ready yet for onboarding this use case.

I don't know if MySQL cluster is the best place, but it has been recommended as a canonical storage solution for production services.

We do have an internal (PET) Cassandra cluster (RESTBase), which might be a fit for the ETL workload and access pattern.

If the outcome of this task is that other systems are more suitable to our needs than MySQL, that'd be absolutely fine. Right now we
don't have many assumptions / deps on db technology, and we are very much looking for input and best practices.

Yeah, my point goes more about the fact that storing materialized data might be better on other distributed storage, especially if the idea is to do it as fast as possible without wanting to throttle things too much. MySQL lag will always be there unfortunately.

  • How many wikis will this be enabled on if the PoC is ok?

Client teams would like to support all 300+ wikis. This should be the comprehensive list: https://github.com/mirrys/ImageMatching/blob/0d5aa1c99905ea2f3a3fa830e0a66c3079377a89/conf/wiki.conf#L2

Thanks - that's quite a list :)

  • What wiki will have the PoC?

The initial 24 wikis are the following: https://github.com/mirrys/ImageMatching/blob/0d5aa1c99905ea2f3a3fa830e0a66c3079377a89/conf/wiki.conf#L5

Thanks, that list already include big wikis like enwiki, eswiki, itwiki...so I would like to look at the final picture (load-wise) with those 300 wikis.

Data volumes: 1.5GB (PoC) to the order of 10s of GBs (prod - estimate)

Do you have a way to narrow this a bit more? I am especially interested on the production estimations if the PoC is successful.

We are in the process of running the pipeline on all wikis and get some stats. I'll update the task once that info is available.

Thank you!

We would like dedicated hosts outside the "misc" pool.

Why do you want specific set of hosts for this? You need to consider that the absolute minimum of hosts we'd need to buy for this would be 4 (2 per DC, so we can have HA). We'd need to allocate budget for this during the next FY.
I would like to understand the reasoning for requesting specific HW, as we have some misc hosts that can accept more load, and having at least 4 more hosts in our infra is of course, another SPOF, and 4 more hosts we'd need to manage just for a single feature.

We have a number of services that follow common a design: periodic high write throughput + lookups on generated datasets). Similarusers is one such service we already deployed in "misc", and IIRC that was a temporary solution and we'd like to move it out. We expect more of these services to come to our backlog in the next FY.

I am still trying to see how this would be deployed. We have sort of two deployments:

  • MW core (ie: s1, s2...) which uses MW load balancer and is accessed via etcd
  • Misc hosts (m1, m2, m3..) which are internal services and they are accessed via proxies (we'd need to buy at least 2 dbproxies hosts for this new deployment). Most of the applications we run there do not support write/read split, but if your application can do it, that's great. You'd be writing to the master and reading (and sharing read load) between two slaves.

So if in the end, we need to go for misc new hosts, we'd definitely need the application logic to be able to do so, as otherwise, writing and reading from a single host isn't ideal, especially with 300 wikis.
The HW requirement would be:

  • dbproxies (2 in eqiad, 1 in codfw)
  • DBs (2 per DC or even 3 per DC)

Functionally we would like to be able to read and write concurrently, with the assumption that writes happen in bulk once a month. We have not been explicit with this requirement implementation, but a master/slave setting would be ideal.
I understand this would impact hw requirements as you describe above. Would the misc pool resources be able to support this load/pattern?

We'd need some rough estimations on expected IOPS (especially reads)

In the feature, read and write logic are separate. A services hosted on k8 will read, and an external process will write.

So to confirm, you can write to an IP and read from a different one?

For writes, we can (gracefully) deal with dbs (or the pipeline machinery) going down and recover from it. It's something we'll capture in the service SLO.

Excellent

To clarify: some degree of throttling is fine/expected, but hopefully we could find a threshold that does not delay data delivery too much if volumes grow.

We can always fine tune things, but when dealing with replication, lag is always to be expected and needs to be addressed for in the application logic.
What is the impact of the application reading from a lagged slave? Maybe not a huge lag, but 5-10 seconds?

Thanks for all the detailed answers!

Hey @Marostegui,

Thanks for detailed reply and constructive feedback.

  • Is MySQL the best place to store this materialized data? Have you considered hadoop perhaps?

I would very much appreciate your input here.
We considered Hadoop - the AQS Cassandra that lives in that realm - but, after consulting with stakeholders, it was not deemed ready yet for onboarding this use case.

This is disappointing, because what you're proposing to do here follows the same pattern as AQS (extract, transform, and persist a generated dataset) -- and since we have to do something -- I would have hoped closing the gap so that we could onboard this use case would have been up for discussion.

I do think Cassandra is a better fit for this. Cassandra's write path is log-structured, which is ideal for high throughput writes, and it's fully distributed and linearly scalable, so if ingestion does become a bottleneck, we have the option to scale horizontally. The dataset is denormalized, and there is no indexing other than primary, so I see no challenges (at least none unique to Cassandra).

Since this seems to be a recurring theme (datasets generated on the analytics cluster), we should probably have a cluster dedicated to these use cases (including AQS), but until we figure that part out, then perhaps we could deploy to the RESTBase cluster. Based on the 10s of GBs estimate above, we probably have enough space for this, but it would be great if we can narrow that down.

We'll also need to think about how to cycle the monthly dataset set through. I have a couple of ideas for this, I'll think about this a bit more and try to write them up as alternatives.

Thoughts?

@Marostegui @Eevans thanks for the input!
I should have stats re dataset sizes of the 300+ wikis towards the end of this week. Crunching is still in progress; it takes a while to cycle through all languages.

I'll follow up to the points you both raised once I have these figures & a more complete picture of what needs to be stored.

Hey @Eevans, @Marostegui,

The full dataset for ImageMatching, generated on 321 wikis, is 2.6GB. It contains 23585365 records. In prod we might want to store multiple snapshots (prev/current months), and possibly variants (to satisfy ad-hoc clients or A/B testing).

Re open questions:

  1. The current application design supports reading and writing from/to different IPs.
  2. IMHO the application should be fine with 5-10sec of read lag (on a monthly schedule). It's great if we know how much lag to expect, so we can capture it in the SLO. cc @BPirkle @nnikkhoui on this one.
  3. Datasets are published out of hadoop to analytics.wikimedia.org. For "prod", I'd rather use swift. We'll need an ingestion process that loads data from analytics.wikimedia.org (or swift) and ingests it into cassandra/mariadb.

Thoughts on cassandra or mariadb:
My takeaway is that Cassandra is a good fit for storing this type of materialised views. Ideally we'd be closer to Hadoop, but using the RESTBase cluster could be a stepping stone towards resources dedicated to these use cases.
I'd be keen on evaluating the RESTBase route before committing to adding HW (and colleagues time) to the mariadb pool.

Maybe premature optimisation, but this dataset stores text fields (part of a potential primary key) that can be relatively long (page titles, image names). Do we have guidelines for hashing/storing long keys?

Below are some summary stats and percentiles on three fields with long text. @Eevans a few degenerate records are longer than Cassandra's max_key_size. I need to do some more validation here, a page title 755993 chars long might be
something wrong with the export process.

page_title
statisticlength
count23585365
mean17
std160
min1
10%7
20%10
30%12
40%14
50%16
60%18
70%20
80%23
90%27
95%33
99%50
max755993
image_id
statisticlength
count23585365
mean11
std384
min3
10%3
20%3
30%3
40%3
50%3
60%3
70%3
80%20
90%32
95%44
99%77
max476747
found_on
statisticlength
count23585365
mean8
std24
min3
10%3
20%3
30%3
40%3
50%3
60%3
70%3
80%3
90%7
95%27
99%127
max2029

Hey @Eevans, @Marostegui,

The full dataset for ImageMatching, generated on 321 wikis, is 2.6GB. It contains 23585365 records.

To be clear, a record as it is referred to here is one globally unique primary key, and the corresponding columns, yes?

In prod we might want to store multiple snapshots (prev/current months), and possibly variants (to satisfy ad-hoc clients or A/B testing).

Can you expound on this? Are you talking about what would amount to a fixed retention policy (like keeping the N most recent months), or being able to a arbitrarily take some sort of snapshot that is readable with a client? Something else?

Re open questions:

  1. The current application design supports reading and writing from/to different IPs.

AIUI, you're planning to deploy the service (the reader) to k8s, and the RESTBase Cassandra cluster is available from there already (the CQL native port, 9042 anyway). Ingestion on the other hand...

  1. IMHO the application should be fine with 5-10sec of read lag (on a monthly schedule). It's great if we know how much lag to expect, so we can capture it in the SLO. cc @BPirkle @nnikkhoui on this one.

For Cassandra at least, I don't think we need to worry about this.

  1. Datasets are published out of hadoop to analytics.wikimedia.org. For "prod", I'd rather use swift. We'll need an ingestion process that loads data from analytics.wikimedia.org (or swift) and ingests it into cassandra/mariadb.

How are they published to analytics.wikimedia.org?

Thoughts on cassandra or mariadb:
My takeaway is that Cassandra is a good fit for storing this type of materialised views. Ideally we'd be closer to Hadoop, but using the RESTBase cluster could be a stepping stone towards resources dedicated to these use cases.
I'd be keen on evaluating the RESTBase route before committing to adding HW (and colleagues time) to the mariadb pool.

Maybe premature optimisation, but this dataset stores text fields (part of a potential primary key) that can be relatively long (page titles, image names). Do we have guidelines for hashing/storing long keys?

Below are some summary stats and percentiles on three fields with long text. @Eevans a few degenerate records are longer than Cassandra's max_key_size. I need to do some more validation here, a page title 755993 chars long might be
something wrong with the export process.

The limit on key names (and cluster column values) is 64KB, and for something that is meant to be an identifier...that is quite large. :) I can't see how either of page titles or image names would need to be part of the primary key, BUT, I also think there must be a problem with the reporting; It looks like page names are limited to 255 chars in MediaWiki (https://www.mediawiki.org/wiki/Manual:Page_table#page_title)

Maybe premature optimisation, but this dataset stores text fields (part of a potential primary key) that can be relatively long (page titles, image names). Do we have guidelines for hashing/storing long keys?

Below are some summary stats and percentiles on three fields with long text. @Eevans a few degenerate records are longer than Cassandra's max_key_size. I need to do some more validation here, a page title 755993 chars long might be
something wrong with the export process.

The limit on key names (and cluster column values) is 64KB, and for something that is meant to be an identifier...that is quite large. :) I can't see how either of page titles or image names would need to be part of the primary key, BUT, I also think there must be a problem with the reporting; It looks like page names are limited to 255 chars in MediaWiki (https://www.mediawiki.org/wiki/Manual:Page_table#page_title)

Thanks for the pointer. It was indeed a problem with records containing unescaped chars getting mangled together by cqlsh. I re-run the analysis and page names / image ids are both under 255 chars.

The full dataset for ImageMatching, generated on 321 wikis, is 2.6GB. It contains 23585365 records.

To be clear, a record as it is referred to here is one globally unique primary key, and the corresponding columns, yes?

Yes.

In prod we might want to store multiple snapshots (prev/current months), and possibly variants (to satisfy ad-hoc clients or A/B testing).

Can you expound on this? Are you talking about what would amount to a fixed retention policy (like keeping the N most recent months), or being able to a arbitrarily take some sort of snapshot that is readable with a client? Something else?

Two use cases:

  1. Keeping a retention policy of two months (current, prev), to allow "snapshot" based updates. Only the current month would be readable by clients.
  2. For the current month, I'ld like to be able load multiple variants (same schema, different content) of the datasets. Variants are generated by different parametrisations of the ImageMatching algorithm. All variants of the current month should be readable by clients.

Re 1: I know you have a better approach in mind, and would love to learn more about it.
Re 2: The main reason for keeping "variants" would be (a/b) testing. Right now, we won't need to store variants. It is not something we have in scope for the near future, but eventually it will come.

Re open questions:

  1. The current application design supports reading and writing from/to different IPs.

AIUI, you're planning to deploy the service (the reader) to k8s, and the RESTBase Cassandra cluster is available from there already (the CQL native port, 9042 anyway). Ingestion on the other hand...

How do we perform ingestion for the current RESTBase services?

Following a patter similar to https://wikitech.wikimedia.org/wiki/Add_Link#Dataset_pipeline (it ingests into MySQL), would it be possible to:

  1. poll for new dataset from a (ephemeral) k8 pod
  2. insert dataset from the k8 pod into Cassandra

?
It would be great if we could develop a re-usable (and scalable) ingestion tool, ideally something spark/flink connector based.

Life would be easier if we could reach RESTBase Cassandra from the Hadoop network.

  1. Datasets are published out of hadoop to analytics.wikimedia.org. For "prod", I'd rather use swift. We'll need an ingestion process that loads data from analytics.wikimedia.org (or swift) and ingests it into cassandra/mariadb.

How are they published to analytics.wikimedia.org?

Data is rsync-ed from stats hosts to analytics.wikimedia.org. That's an established path to publish data from Hadoop to the outside world (https://analytics.wikimedia.org/published/README). However, my understanding is that's it is meant for manual copies / one-offs. I'd rather use a more appropriate
channel, if available.

Life would be easier if we could reach RESTBase Cassandra from the Hadoop network.

For the right usecase I imagine access could be authorised - we currently have firewall rules in place that allow access from Analytics->AQS, which are on the prod network outside of the analytics cluster. As has been mentioned, this pattern is quite similar to that of AQS in general.

Life would be easier if we could reach RESTBase Cassandra from the Hadoop network.

For the right usecase I imagine access could be authorised - we currently have firewall rules in place that allow access from Analytics->AQS, which are on the prod network outside of the analytics cluster. As has been mentioned, this pattern is quite similar to that of AQS in general.

Hey @hnowlan, thanks for the pointer.
Do I read correctly that those AQS rules match all outgoing analytics vlan traffic? Does that include hadoop workers?

Life would be easier if we could reach RESTBase Cassandra from the Hadoop network.

For the right usecase I imagine access could be authorised - we currently have firewall rules in place that allow access from Analytics->AQS, which are on the prod network outside of the analytics cluster. As has been mentioned, this pattern is quite similar to that of AQS in general.

Hey @hnowlan, thanks for the pointer.
Do I read correctly that those AQS rules match all outgoing analytics vlan traffic? Does that include hadoop workers?

Yep, aiui that's a requirement given that hadoop workers access AQS to load data via the oozie job.

LSobanski subscribed.

Looks like it's safe to remove DBA from this task given the direction it took. @gmodena what would be a good project to assign this to?

You may want to change the title as well.

@gmodena @Eevans What should happen to this task now? Should it close?

@gmodena @Eevans What should happen to this task now? Should it close?

That would be my vote, yes.