Page MenuHomePhabricator

Establish testing procedure for Druid-based endpoints
Closed, ResolvedPublic5 Estimated Story Points

Description

The wikistats2 endpoints, planned to be implemented in T288301: AQS 2.0:Wikistats 2 service, use Druid data in production. We'll need a way to provide testing data. Options include:

  1. a Druid-based Docker Compose environment, maybe based on https://druid.apache.org/docs/latest/tutorials/docker.html
  2. a "fake druid" similar to the current aqs implementation that just returns hard-coded data for specific hard-coded requests
  3. SSH tunnel to production data. I mention this because the existing system uses this, but we should find a different way. I personally consider this a non-option.

Relevant comments form @JAllemandou here: https://phabricator.wikimedia.org/T288301#7997714

Event Timeline

BPirkle triaged this task as Medium priority.Jun 23 2022, 1:23 AM
BPirkle set the point value for this task to 5.
BPirkle moved this task from Incoming to Backlog on the API Platform board.

Status update:

Fiddled around quite a bit with the docker compose solution. Relevant urls:

Found this to be functional but a bit unreliable. To successfully get all the way through the tutorial, from image download to ingestion to querying data, I ended up taking the containers up and down several times, and update them from docker hub. It seemed like some containers (notably the "broker") stopped on their own, and I had to do additional "docker compose up" commands to get it going again. Also, I had obscure Java errors until I updated the images from dockerhub. Some of that may have been unnecessary and my fault - I'm not familiar with Druid yet - but I also clearly wasn't successful with just "download a docker-compose.yml file and run docker compose up" solution.

It is still possible that this tutorial environment could be the base of a testing environment for us, but I'm going to look at alternatives before exploring this further.

Hi @BPirkle - I'll gladly spend some time with you (and anyone interested) to explain more about Druid if needed :)

Thank you @JAllemandou , we may need to take you up on that offer.

For now, can you check for any major flaws in my thinking?

  1. I'm still pursuing the possibility of testing against a local docker-based druid
  2. I found and tweaked this person's docker+druid repository. I can now successfully "docker build" a local installation, and ingest canned data into it at docker build time
  3. I'm planning to modify the ingestion to instead ingest a smallish canned dataset representative of our production data
  4. I think I need a "spec" and matching data to ingest
  5. I found the mediawiki_history_reduced wikitech page, which seems to provide a lot of info for the spec
  6. I tried to use beeline to query some sample data, but failed (either error code 2, or query syntax error, but never happy data)
  7. I think I can ssh tunnel into prod druid and manually execute native queries via curl (rather than the druid sql I was trying to do via beeline) to get some data
  8. assuming I get data and a spec happy, I can then publish my image with ingested data and devs can run it to test against without having to do the more time-consuming install/ingest steps
  9. (non) profit :)

Please let me know if you see anything in there that is incorrect and/or if you see a better way to accomplish this.

Worked a bit more on this today.

I was able to execute an canned example druid query via the instructions on wikitech but was not able to query the mediawiki_history_reduced data that I was really hoping to.

This worked:

  1. ssh stat1005.eqiad.wmnet
  2. curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @example-wikitech-query.json

Where example-wikitech-query.json contained this:

{
     "queryType": "timeseries",
     "dataSource": "wmf_netflow",
     "intervals": "2019-09-05T19:11Z/2019-09-06T19:11Z",
     "granularity": "all",
     "aggregations": [
       {
         "name": "__VALUE__",
         "type": "longSum",
         "fieldName": "bytes"
       }
     ]
}

I received these results:

[ {
  "timestamp" : "2019-09-05T19:11:00.000Z",
  "result" : {
    "__VALUE__" : 32009715037000
  }
} ]

However, doing the same thing with this query, which I hand-constructed based on code in the prod aqs fixtures.js file:

{
  "queryType": "timeseries",
  "dataSource": "mediawiki_history_reduced",
  "granularity": "day",
  "filter": {
    "type": "and",
    "fields": [
      { "type": "selector", "dimension": "event_entity", "value": "page" },
      { "type": "selector", "dimension": "event_type", "value": "daily_digest" },
      { "type": "selector", "dimension": "project", "value": "en.wikipedia" }
    ]
  },
  "aggregations": [ { "type": "longSum", "name": "edited_pages", "fieldName": "events" } ],
  "postAggregations": [],
  "intervals": [ "2022-04-01/2022-04-03" ]
}

Just gave an empty results set: []

Not sure what I'm doing wrong, any advice is welcome.

  1. I think I need a "spec" and matching data to ingest

Yes! We have examples of spec as well as data for you.
The hadoop-ingestion spec (templated) of the mediawiki_history_reduced dataset is here: https://github.com/wikimedia/analytics-refinery/blob/master/oozie/mediawiki/history/reduced/load_mediawiki_history_reduced.json.template

  1. I found the mediawiki_history_reduced wikitech page, which seems to provide a lot of info for the spec

With the spec provided above, you should good - the page you linked provides you info about the data itself and how to query it. The data is available on hadoop, you can query it using hive (hive or beeline, see https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Hive), or Spark (preferred way, see https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark). Querying the data shall allow you to generate a small extract that you'll ingest with your test-druid. Spark is goos at writing json :)

  1. I tried to use beeline to query some sample data, but failed (either error code 2, or query syntax error, but never happy data)

Let's talk about that, I probably can help.

  1. I think I can ssh tunnel into prod druid and manually execute native queries via curl (rather than the druid sql I was trying to do via beeline) to get some data

Beeline is an access to Hive, not druid, so it's expected you can't query druid from there. You should be able o query druid with SQL however using curl, with the SQL api: https://druid.apache.org/docs/latest/querying/sql-api.html

  1. assuming I get data and a spec happy, I can then publish my image with ingested data and devs can run it to test against without having to do the more time-consuming install/ingest steps

I Think that's right :)

  1. (non) profit :)

\o/

I forgot to add on this:

I was able to execute an canned example druid query via the instructions on wikitech but was not able to query the mediawiki_history_reduced data that I was really hoping to.

This is because we have 2 druid clusters, one for private/internal data, one for public data! the wmf_netflow datasource is on the private cluster, while the mediawiki_history_reduced_YYYY_MM (latest is mediawiki_history_reduced_2022_05).
Hosts of the public cluster are: an-druid100[12345].eqiad.wmnet and the private one are: druid100[45678].eqiad.wmnet.
Hopefully with that info you should be able to query the data!

Thank you!

For now, I managed to grab some data via the Hue interface. This probably isn't the best way long term, and it probably isn't the right dataset, but it was enough to move forward.

I now have a local docker container that happily ingests that data. Next up, I'll publish both the repo and the image and see if @FGoodwin or @codebug can successfully pull and run it locally, as proof of concept.

Then I can circle back, clean up rough edges, and figure out what test data we really want to export/ingest to include in the image.

Edit: here's what I have so far: https://github.com/bpirkle/aqs-docker-druid-test-env
My personal github is, of course, a terrible place for this, and we'll find a better one. For now, it was just making me nervous to have it exist only on my laptop.

I have a question that may be better discussed somewhere else, But thought I'd start here as it is at least somewhat related.

I have the Docker+Druid env working well enough to think about what actual test data we want to ingest, so I started looking for which AQS endpoints use Druid data. At first, I didn't find any in what we plan to implement for T263489: AQS 2.0, and I started getting nervous that all our endpoints were Cassandra-backed and I was wasting my time with the Druid env.

Then I looked in the old analytics-aqs code and found a few things that appear to use Druid. However, I don't see these endpoints under any of these AQS docs pages. I see only one of them on Wikimedia REST API doc page.

The one I see is the registered-users endpoint, which appears to use Druid, and which I'm guessing we need to add a Phab task for. But I'm not sure about the /digests/ or the /revisions/ endpoints. Do we need to implement those as well?

What else might we be missing?

@BPirkle: good question, multi-part answer:

First, everything in the mediawiki-history-metrics.yaml endpoint definition is hitting Druid. To connect that with the REST API doc page, if you expand the following sections you see they're all defined in the mediawiki-history-metrics.yaml:

  • Edited pages data
  • Editors data
  • Edits data
  • Registered users data
  • Bytes difference data

In some cases, the link is obvious but in other cases it's indirect. Like, for example, the Bytes difference endpoints connect through the revisions endpoint as defined in the yaml. This is not at all clear unless you look at the more specific endpoint mapping here: bytes-difference.yaml. I think the list of things to implement in AQS 2.0 is whatever makes it out to the public REST API docs (see next part).

Second, I think it is true that some of the endpoints, as defined by AQS yaml files here do not have a corresponding public version. It may be a waste of time to look at it this way. It's fine to look at what's available in the public REST API docs and only implement those endpoints with the backend as defined in these more detailed AQS yaml files. That way we keep AQS 2.0 to a tight scope and we can always add more endpoints as we need them.

Third, @JAllemandou is on vacation for a bit, so I have to consider what would happen if he wants to keep some of the endpoints that don't have public counterparts. I think it would be easy enough to add them later, so unless I'm wrong about that, let's keep it simple for now.

In some cases, the link is obvious but in other cases it's indirect. Like, for example, the Bytes difference endpoints connect through the revisions endpoint as defined in the yaml. This is not at all clear unless you look at the more specific endpoint mapping here: bytes-difference.yaml.

Thank you, @Milimetric ! That was the piece I was not understanding.

We'll proceed with the public endpoints as you suggest, and can always adjust plans later if needed.

I forgot to add on this:

I was able to execute an canned example druid query via the instructions on wikitech but was not able to query the mediawiki_history_reduced data that I was really hoping to.

This is because we have 2 druid clusters, one for private/internal data, one for public data! the wmf_netflow datasource is on the private cluster, while the mediawiki_history_reduced_YYYY_MM (latest is mediawiki_history_reduced_2022_05).
Hosts of the public cluster are: an-druid100[12345].eqiad.wmnet and the private one are: druid100[45678].eqiad.wmnet.
Hopefully with that info you should be able to query the data!

Looks like I need more help.

tl;dr: I'm doing something wrong, but I have no idea what :(

Lots of details follow. Feel free to skip them if you like, I mostly included them for completeness.

I tried three ways to query the mediawiki_history_reduced data, and failed at all of them:

  1. Hue interface: I can execute queries like this and get data:

SELECT * FROM mediawiki_history_reduced WHERE snapshot='2022-05';
But if I try to do anything more targeted, I get an error. For example, this query:
SELECT * FROM mediawiki_history_reduced WHERE project='ar.wikipedia' AND snapshot='2022-05';
gives me this error, which I do not know how to interpret:

org.apache.hive.service.cli.HiveSQLException: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:380) at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:257) at org.apache.hive.service.cli.operation.SQLOperation.access$800(SQLOperation.java:91) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork$1.run(SQLOperation.java:348) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1938) at org.apache.hive.service.cli.operation.SQLOperation$BackgroundWork.run(SQLOperation.java:362) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
  1. Spark SQL: okay, fine, I'd rather use the command line anyway. so I tried this:
ssh stat1005.eqiad.wmnet
kinit
spark2-sql
SELECT page_title FROM wmf.mediawiki_history_reduced WHERE snapshot='2022-05';
SELECT page_title FROM wmf.mediawiki_history_reduced_2022_05;

Got java out of memory error error for the first query (along with a bunch of other queries I tried that added various "WHERE" clauses, to see if selecting for more data would help. Second query gave "Error in query: Table or view not found: wmf`.mediawiki_history_reduced_2022_05; line 1 pos 23`"

  1. bummer, but I like the idea of posting a query to the druid server better anyway. So I tried that, per this Wikitech page, and was only ever able to get empty brackets as a response. Specifically, I was executing queries from stat1005.eqiad.wmnet like this:

curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @filename.json

Where "filename.json" was various queries I tried. Some examples:

{
	"queryType": "timeseries",
	"dataSource": "mediawiki_history_reduced",
	"granularity": "day",
	"filter": {
		"type": "and",
		"fields": [{
				"type": "selector",
				"dimension": "event_entity",
				"value": "page"
			},
			{
				"type": "selector",
				"dimension": "event_type",
				"value": "create"
			},
			{
				"type": "not",
				"field": {
					"type": "selector",
					"dimension": "other_tags",
					"value": "redirect"
				}
			}
		]
	},
	"aggregations": [{
		"type": "longSum",
		"name": "new_pages",
		"fieldName": "events"
	}],
	"intervals": "2022-01-01/2022-02-01"
}

And

{
  "queryType": "timeseries",
  "dataSource": "mediawiki_history_reduced_2022_07",
  "granularity": "month",
  "filter": {
    "type": "and",
    "fields": [
      { "type": "selector", "dimension": "project", "value": "en.wikipedia" },
      { "type": "selector", "dimension": "user_type", "value": "user" },
      { "type": "selector", "dimension": "event_entity", "value": "revision" },
      { "type": "selector", "dimension": "event_type", "value": "create" },
      { "type": "not", "field": { "type": "selector", "dimension": "other_tags", "value": "deleted" } }
    ]
  },
  "aggregations": [
    { "type": "longSum", "name": "edits", "fieldName": "events" }
  ],
  "intervals": [ "2022-07-01T00:00:00.000/2022-08-01T00:00:00.000" ]
}

I stole that second one from this Wikitech page, which suggests ssh'ing into druid1004.eqiad.wmnet and executing the query against localhost:8082. However, ssh druid1004.eqiad.wmnet gets me ssh: connect to host druid1004.eqiad.wmnet port 22: Connection timed out , and per previous reply, it sounds like an-druid is the public cluster that I need. I was able to execute this against the wmf_netflow dataset:

{
     "queryType": "timeseries",
     "dataSource": "wmf_netflow",
     "intervals": "2019-09-05T19:11Z/2019-09-06T19:11Z",
     "granularity": "all",
     "aggregations": [
       {
         "name": "__VALUE__",
         "type": "longSum",
         "fieldName": "bytes"
       }
     ]
 }

Via this command:

curl -L -X POST "http://an-druid1001.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @wmf_netflow.query.json

And get this response:

[ {
  "timestamp" : "2019-09-05T19:11:00.000Z",
  "result" : {
    "__VALUE__" : 32009715037000
  }
} ]

I found that confusing, because I understood the previous response to say that wmf_netflow was on the private (druid100[45678].eqiad.wmnet) cluster, but I was hitting the public (an-druid100[12345].eqiad.wmnet) cluster.

Clearly, I don't have any idea what I'm doing, and randomly cutting-and-pasting from wiki pages isn't helping. I'd appreciate any direction you can provide.

Hi @BPirkle - I'm sorry we left you blind in all this.
Let me give you my understanding on the context and the various trials you did.

First on systems:

  • Hue / Hive - When using the hue UI you're using the Hive query engine behind the scene. Hive runs SQL queries by translating them to Hadoop MapReduce tasks, reading the data from HDFS.

I have no clue as to why your queries using hue failed - I tried them myself and they succeeded :( However, I'd advise not to use that tool, as we'll hopefully soon deprecate it in favor of using Spark with notebooks.

  • Spark - Spark is a query engine that replaces Hadoop MapReduce but works mostly in similar ways: you can give it SQL, it will translate it into spark subtasks to be run in parallel and read the data from HDFS.

One big difference between Hive and Spark is that spark needs to be configured at start time in terms of resources it will use on the cluster. You'll find example on how to start a spark-sql (or any other spark job) with resource configuration here: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Start_a_spark_shell_in_yarn, and more on how much resources to use here: https://wikitech.wikimedia.org/wiki/Analytics/Systems/Cluster/Spark#Spark_Resource_Settings.
When you launched your spark2-sql job without resource config, the job used 2 workers and that was not enough to handle the data size.

  • Druid - We have two druid clusters, on public and one private. The hosts of the private cluster are an-druid100[12345].eqiad.wmnet, and the hosts of the public one are druid100[45678].eqiad.wmnet.

This is the opposite of what I told you a few weeks ago:

Hosts of the public cluster are: an-druid100[12345].eqiad.wmnet and the private one are: druid100[45678].eqiad.wmnet

I am very sorry I messed up :S And while writing this I realize the naming of the hosts should have been done better :)
The private cluster handles datasets that are not queryable by AQS by are visible through Superset and turnilo, while the public cluster only serves the dataset queryable by AQS.

Then on datasets:

  • mediawiki_history_reduced_YYYY_MM datasources on druid public cluster - Those are the datasources AQS query (only one at a time, updated monthly through configuration). Each of them contains all the data needed for the AQS endpoints up to the end of the month it references in its name.
  • wmf.mediawiki_history_reduced hive/spark table on Hadoop- This contains the data used to load the druid datasources. The snapshot field defines up to which month the data is defined (the snapshot month is included).

With all that, some examples:
In hue:

SELECT
  project,
  COUNT(1) as revisions_created
FROM wmf.mediawiki_history_reduced
WHERE snapshot = '2022-08'
  AND YEAR(event_timestamp) = 2022
  AND MONTH(event_timestamp) = 7
  AND event_entity = 'revision'
  AND event_type = 'create'
GROUP BY project
ORDER BY revisions_created DESC
LIMIT 10;

1	wikidata	23914911
2	commons.wikimedia	9116716
3	en.wikipedia	5649871
4	en.wiktionary	915244
5	fr.wikipedia	760184
6	de.wikipedia	739002
7	ru.wikipedia	736428
8	es.wikipedia	582137
9	zh.wikipedia	550489
10	ja.wikipedia	480457

In spark2-sql started with the default config in the above link (you can notice the query is the same in hive and spark:

-- Number of revisions created in July 2022 by project (top 10 descending)
SELECT
  project,
  COUNT(1) as revisions_created
FROM wmf.mediawiki_history_reduced
WHERE snapshot = '2022-08'
  AND YEAR(event_timestamp) = 2022
  AND MONTH(event_timestamp) = 7
  AND event_entity = 'revision'
  AND event_type = 'create'
GROUP BY project
ORDER BY revisions_created DESC
LIMIT 10;

wikidata	23914911
commons.wikimedia	9116716
en.wikipedia	5649871
en.wiktionary	915244
fr.wikipedia	760184
de.wikipedia	739002
ru.wikipedia	736428
es.wikipedia	582137
zh.wikipedia	550489
ja.wikipedia	480457

In druid:

vi top_revisions_per_project_2022_07.json

{
  "queryType": "topN",
  "dataSource": "mediawiki_history_reduced_2022_08",
  "dimension": "project",
  "granularity": "month",
  "metric": "revisions_created",
  "threshold": 10,
  "filter": {
    "type": "and",
    "fields": [{
        "type": "selector",
        "dimension": "event_entity",
        "value": "revision"
      },
      {
        "type": "selector",
        "dimension": "event_type",
        "value": "create"
      }
    ]
  },
  "aggregations": [{
    "type": "longSum",
    "name": "revisions_created",
    "fieldName": "events"
  }],
  "intervals": "2022-07-01/2022-08-01"
}


curl -L -X POST "http://druid1004.eqiad.wmnet:8082/druid/v2/?pretty" -H "content-type: application/json" -d @top_revisions_per_project_2022_07.json

[ {
  "timestamp" : "2022-07-01T00:00:00.000Z",
  "result" : [ {
    "revisions_created" : 23914911,
    "project" : "wikidata"
  }, {
    "revisions_created" : 9116716,
    "project" : "commons.wikimedia"
  }, {
    "revisions_created" : 5649871,
    "project" : "en.wikipedia"
  }, {
    "revisions_created" : 915244,
    "project" : "en.wiktionary"
  }, {
    "revisions_created" : 760184,
    "project" : "fr.wikipedia"
  }, {
    "revisions_created" : 739002,
    "project" : "de.wikipedia"
  }, {
    "revisions_created" : 736428,
    "project" : "ru.wikipedia"
  }, {
    "revisions_created" : 582137,
    "project" : "es.wikipedia"
  }, {
    "revisions_created" : 550489,
    "project" : "zh.wikipedia"
  }, {
    "revisions_created" : 480457,
    "project" : "ja.wikipedia"
  } ]
} ]

Phew - we got same results :)
The other thing interesting to notice is the difference in response time !

Please feel free to ping me on slack or IRC @BPirkle, debunking all this in written is not easy :)

Hi @BPirkle - I'm sorry we left you blind in all this.
Let me give you my understanding on the context and the various trials you did.

No worries. As usual, I learned more by beating my head against it for awhile.

Based on your reply above, I've been able to successfully query some proof-of-concept data, which is a big step forward. Now I'm constructing queries for the actual data that I'll need for testing. I'll be tracking this work under T317803: AQS 2.0: Extract production testing data for Druid-based endpoints. If I run into trouble there, I may hit you up for more advice.

If I run into trouble there, I may hit you up for more advice.

Please don't hesitate :)

I think it could be interesting for us to spend some time talking about the coding strategy for Druid querying. I found it useful to build a code API on top of the JSON queries structure when building the first version in javascript. Happy to share more on this as you move forward.

Regarding image size of the test env (which is currently on the large side at a little over a gig), I manually downloaded and extracted Druid and took a look at the source. We're already removing most of the fluff in the Dockerfile, so most of Druid's contribution to the image size is from its lib directory, which is required. So no gains to be had there.

I should look into using a smaller base image.

(Oh, and I'll move all the repository to our GitLab installation eventually.)

Compressed image size reduced from 1.14G to 203M, via a combination of using a smaller base image, combining layers, and being a little more efficient about what the image includes.