Page MenuHomePhabricator

Make elasticsearch cluster accessible from analytics hadoop workers
Closed, ResolvedPublic

Description

For tasks like T116055 we need the ability for hadoop workers to send data to elasticsearch cluster. For that, we need to modify firewall configuration to allow outgoing connections (specific port is OK) to the cluster.

We do not think it would be a privacy issue since in order to use this connection you should already have access to hadoop workers/analytics cluster, at which time you will be able to see (and export) the data contained there anyway.

The hadoop workers all need to communicate with search.svc.eqiad.wmnet and search.svc.codfw.wmnet on port 9200. The servers behind this are firewalled, but port 9200 is open to 10.0.0.0/8.

Event Timeline

Smalyshev created this task.Dec 3 2015, 8:59 PM
Smalyshev raised the priority of this task from to Medium.
Smalyshev updated the task description. (Show Details)
Smalyshev added subscribers: Smalyshev, EBernhardson.
Restricted Application added a subscriber: Aklapper. · View Herald TranscriptDec 3 2015, 8:59 PM

I took a look around, and afaict this is managed directly on the routers and not from within operations/puppet or any other repository (at least, previous tickets to adjust analytics firewalling never seem to link to a patch).

The firewall was initially created in T82124. Previous tickets were handled by @BBlack, @akosiaris, and probably others. We might just want to bring this up in SOS and let it be delegated to whoever is appropriate.

@Ottomata indicated this would be reasonable in https://phabricator.wikimedia.org/T113440#1735695

EBernhardson updated the task description. (Show Details)Dec 11 2015, 4:55 PM
EBernhardson set Security to None.

@akosiaris can you help with this?

@akosiaris can you help with this?

I am not even sure what this is about. Getting data from hadoop into elasticsearch ? from elasticsearch into hadoop ? And what kind of data ? The blocked tickets don't help all that much. Can I have some more info please ?

Data would move in both directions.

The two linked tickets are about shipping a page popularity score from hadoop to elasticsearch. This score is generated by aggregating down the wmf.page view_hourly table from hive into a page_id and score, which is a value between 0 and 1 which provides a total ordering signal about page popularity via page views.

The next job to use this pipeline will need to move data in both directions . it will read in the page_id, namespace, title, incoming redirects and outgoing links from elasticsearch to build a graph of each wiki in hadoop. This graph will be iterated with the Page Rank algorithm to again generate a dataset containing page_id and a score between 0 and 1 which provides a different total ordering of the Wikipedia pages.

Both of these scores are to be imported into the elasticsearch document model and utilized to improve the quality of search results.

I should add all connections are opened by the hadoop workers for communication to the elasticsearch cluster. Elasticsearch will never try and open a connection to hadoop.

Data would move in both directions.

The two linked tickets are about shipping a page popularity score from hadoop to elasticsearch. This score is generated by aggregating down the wmf.page view_hourly table from hive into a page_id and score, which is a value between 0 and 1 which provides a total ordering signal about page popularity via page views.

I am gonna go on a limp here, but isn't this something that should be done via the pageview API ? I see for example

https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia/all-access/all-agents/Albert_Einstein/daily/2015100100/2015103100

which sounds very close to what you are describing. Granted, granularity is daily and hourly is not currently supported and you will have to calculate the score but it really sounds very close. And it's a stable and mostly importantly documented API. To me it sounds like a way better way to get data about pageviews into anything, elasticsearch included.

If the API is not sufficient, something tells me it's probably worth more to all to try and make it sufficient instead of devising a new adhoc way because it can not satisfy your current needs.

The next job to use this pipeline will need to move data in both directions . it will read in the page_id, namespace, title, incoming redirects and outgoing links from elasticsearch to build a graph of each wiki in hadoop. This graph will be iterated with the Page Rank algorithm to again generate a dataset containing page_id and a score between 0 and 1 which provides a different total ordering of the Wikipedia pages.

Which sounds pretty much like the above, with the extra mile of getting some data out of elasticsearch first and using it to feed the page rank algorithm. And from what I understand here the role of hadoop is to just be the "muscle" that runs the crunching on the data.

Which again comes at the exact same argument I made a paragraph above with the added fact that this will probably take more time and effort to integrate into the current API (or create a new one).

Both of these scores are to be imported into the elasticsearch document model and utilized to improve the quality of search results.

To be honest, it sounds like we are devising an ad-hoc way here of getting the data we need around the cluster cause we don't provide it in a clean, documented and transparent way. I get that you probably are running to meet some goal (we all seem to be these days) but I must say I don't love this way of making it happen much.

mark added a subscriber: mark.Dec 16 2015, 11:59 AM

As I understand it, Pageview API allows to have data about one article or aggregated data about sets of articles. However, we need to get individual data about all individual articles, process it and then load it into ElasticSearch. I don't see how Pageview API by itself is going to help us here. It may be that Pageview could supply raw data for it - though with millions of pages in the result I wonder how it would be implemented, since loading it into memory may be a bit too much and I see no streaming API, maybe I missed it?

In any case, the idea here is to create infrastructure with input being analytics data, which is then processed by hadoop (in order to manage the volume of the data more efficiently), get some per-page information out of it - which depends on the processing algorithm applied and may change a lot as we experiment with various algorithms - and load that as data into ES cluster to use this data to amend search results, scoring, etc.

So if Pageview API can be part of this picture, this is great - then it would be good to describe where it comes into the picture and where the processing is going to be performed. As of now, we viewed it as processing being performed on analytics machines, which requires the result then being shipped to ES cluster. Thus necessity of opening the firewall to be able to ship data from analytics to ES cluster. If there's another way of doing it let's discuss it, maybe it would be better.

To be honest, it sounds like we are devising an ad-hoc way here of getting the data we need around the cluster cause we don't provide it in a clean, documented and transparent way.

I think what we're designing here is a way to process analytics data in order for it to be useful for Elasticsearch improvement. Now, it doesn't have to be designed how it is now, but it has to use analytics data, has to be able to process it at scale and has to have a way for the data to end up in Elasticsearch. So if we can do it with Pageview API, that's fine - let's describe how that would work.

Joe added a subscriber: Joe.Dec 17 2015, 7:21 AM
Joe added a comment.Dec 17 2015, 7:33 AM

So, apart from the question of "where should we consume the data from", which is important, I think we should try to keep the communications between the analytics cluster and the main cluster limited to a few well defined processes.

So here we have two communication channels:

  • Analytics -> production : I don't know the specifics, but I guess that the hadoop workers just churn data and want to submit it to the ES cluster. This could be done for example by having the hadoop workers send a message over kafka, and then having some process consuming the message and submitting it to ES. I think the services team is working specifically on something in this direction, btw, for unrelated reasons.
  • Production -> analytics again, but I'd ask @Ottomata his opinion, I think we mostly settled on using kafka to communicate data from production to the analytics cluster. But I'd ask them directly.

My general point is: I am strongly against poking holes in firewalls for specially-crafted data flows - for the simple reasons they tend to become a nightmare to maintain and debug on the long run.

Tangential, but just wanted to point out that this won't be the last time we need to provide aggregated hadoop data into 'production' (the article reccomender people needed something similar), so having some form of supported, non-ad-hoc workflow for this would be nice.

Joe added a subscriber: mobrovac.Dec 17 2015, 12:26 PM

Actually, I think the component @mobrovac is implementing for services could be an optimal candidate to be the "some process consuming the message and submitting it to ES" I referred to earlier.

Yup, @Joe, indeed it sounds like this could benefit from the Event-Platform system we are working on together with the Analytics team. Here's what could be done:

  1. Hadoop sends the data modelled as events to the EventBus
  2. These events end up in a specific Kafka topic
  3. The change-propagation consumer we are building picks them up and emits HTTP requests to ES

To get us started, we'd need to talk about:

  • Event schema definitions: how will the data gotten from Hadoop be represented in the Kafka queue?
  • HTTP request definition: when the consumer gets a message from Kafka, how would it be turned into a valid ES request?

Hmm, interesting.

Using Kafka for batch/aggregate data is a little funky, but I think ok. This would be generated in Hadoop hourly, right? Hourly would mean that bursts of aggregate data would be sent to Kafka. What would an 'event' be in this case? Would it be a single message of all aggregate popularity scores, or would it be a message for each page popularity score? How would ES store this data? Would it update the popularity score for a particular article one at a time?

I suppose the volume for this would be low enough to send over HTTP to the event service, but I'm not so sure that is the intended use of it. It certainly isn't of the event service instance we are about to deploy. That is meant for intaking (low volume) 'production' events. Discovery wants to feedback aggregate analytics data to a prod system. Kafka may be good for this, but I don't think we want to push this data through the new 'main' Kafka cluster(s).

Production -> analytics again, but I'd ask @Ottomata his opinion, I think we mostly settled on using kafka to communicate data from production to the analytics cluster. But I'd ask them directly.

This isn't a rule, but it does make easy to import into Hadoop (and use for other things). If it makes sense for ES to send to Kafka then we should do it.

If any of this data is going to go into Kafka, then yes, let's make some nice schemas for them and add them to the mediawiki/event-schemas repository.

My general point is: I am strongly against poking holes in firewalls for specially-crafted data flows - for the simple reasons they tend to become a nightmare to maintain and debug on the long run.

I'm not sure how that would be harder to maintain and debug than the scheme involving two additional processes (with their own points of failure), new event schema and additional data encoding/decoding pipeline. I would say http request to Elasticsearch is easier to maintain and debug than sending event to some event queue that separate consumer process would eventually pick up and then send the same request to Elasticsearch - just because there are two more systems involved, each of which can potentially have problems, and at least one more team involved which will need to be engaged whenever we need to debug or implement something.

As for sending it through Kafka, I'm not sure how that is supposed to work - can the current system handle sending events for each page for which we have the logs? Would event scheme support sending batches of updates?

The data itself is very simple right now - it's basically tuple of (project id, page id, score field, score value). But there's a lot of them, so we need to batch and probably parallellize. ES stores this data as the part of the overall index - i.e. each project has an index, each page id would have a document, and in that document the field needs to be updated with score value (right now there's one but there can be more scores in the future).

Do we have existing Kafka consumer that can process such data? How batching and parallelization is done in that consumer?

Event schema definitions: how will the data gotten from Hadoop be represented in the Kafka queue?

I've described the data above but I'm not sure how it can be represented as event, since I don't know what events can do. Is there any docs on that?

HTTP request definition: when the consumer gets a message from Kafka, how would it be turned into a valid ES request?

We already have code that creates ES request and handles all necessary logic around it, so it can be seen here:
https://github.com/wikimedia/wikimedia-discovery-analytics/blob/master/oozie/transfer_to_es/transferToES.py#L52
and
https://github.com/wikimedia/wikimedia-discovery-analytics/blob/master/oozie/transfer_to_es/transferToES.py#L81
Basically it is a simple PUT request that sends a set of JSON-encoded objects representing updates to each document.

Sounds like we need a meeting...:)

Looking at https://www.mediawiki.org/wiki/Extension:EventLogging/Guide#Creating_a_schema our schemas, unlike JSON Schema, do not support arrays, which would be a real problem with batching entities... Of course, we could make it all a giant string, but that sounds a bit ridiculous. Any suggestions?

Ok! We just had a very helpful meeting about this. I'm going to try to summarize and make a recommendation.

There are two parts of the scoring that Discovery is trying to do.

  1. score calculation. This happens in Hadoop.
  2. score updates for ElasticSearch. This could happen in a few places.

Score updates are why we are discussing opening up the firewall between Analytics and ElasticSearch. We could model the updates as individual events, and send those to a Kafka topic. These events would then be batch consumed by workers somewhere (e.g. each worker consumes N messages at a time), and then HTTP POSTs would be sent to the ElasticSearch cluster to update the documents with scores.

Discovery would like for the batch HTTP POSTs to be as parallel as possible, to take advantage of the many ES nodes. Each worker needs to wait for the HTTP response from ElasticSearch to see how each of the updates in a batch went, and react to any failures.

That's the hard part. There is no good place in production now to easily parallelize worker processes. There is the job queue, but Discovery doesn't think it is reliable enough, especially for this volume. The work Marko and the Services team are doing for Change Propagation and a new job queue looks promising, but it is not nearly ready.

In lieu of a good place to parallelize consumer workers, Kafka is not a good fit for this. The good news is that the score update process is separate from the score calculation process, so when we have the <new fancy parallel consumer job queue change propagation bla bla> service, we can revisit this and consider shipping these updates from Hadoop to ElasticSearch via Kafka.

For now though, I think we should open up the firewall as originally requested.

We didn't touch much on how to get data from ElasticSearch back into Hadoop, but it sounds like we can wait on that question. Kafka probably makes sense here. Consuming from Kafka into Hadoop is easy, and can be done either with Camus into files in HDFS, or by Spark jobs that Discovery is currently working with.

when we have the <new fancy parallel consumer job queue change propagation bla bla> service, we can revisit this and consider shipping these updates from Hadoop to ElasticSearch via Kafka.

Just talked to @akosiaris about this in IRC, and he wanted me to highlight this point. If we do open up the firewall for this, it is with the caveat that it isn't Ops' ideal choice. When we have a new job queue where the ES updates can be run, Discovery should prioritize moving those jobs to it, so that the firewall hole can be closed. I think as long as there is agreement about that, then Alex is ok with opening the firewall. (He can comment otherwise if I misunderstood.)

when we have the <new fancy parallel consumer job queue change propagation bla bla> service, we can revisit this and consider shipping these updates from Hadoop to ElasticSearch via Kafka.

Just talked to @akosiaris about this in IRC, and he wanted me to highlight this point. If we do open up the firewall for this, it is with the caveat that it isn't Ops' ideal choice. When we have a new job queue where the ES updates can be run, Discovery should prioritize moving those jobs to it, so that the firewall hole can be closed. I think as long as there is agreement about that, then Alex is ok with opening the firewall. (He can comment otherwise if I misunderstood.)

Exactly what Ottomata points out to that. This is not an ideal choice and while doing it now is OK due to the needs at hand we are very weary of it since it is not a long term maintainable solution (router ACLs never are). Whatever replacement for this that is more structured and follows a path that other solutions can use too (like the jobqueue which sounds ideal for this) is prefered.

I 've just opened the holes in router ACLs.

We talked on the meeting about this - once we have Kafka setup that can:

  • Have parallel consumers (outside of analytics cluster) we could code to send things to ES
  • Keep with the volume around 400M tuples/daily

we would be completely fine with moving to that. I understand the Kafka/consumer work is not ready yet but eventually will be - so then I think we would work on bringing it to that solution instead of FW holes. So I think we are in agreement about that.

akosiaris closed this task as Resolved.Dec 22 2015, 2:07 PM

Great. Thanks @Smalyshev. Resolving this

EBernhardson reopened this task as Open.Jan 22 2016, 6:48 PM

@akosiaris I finally got around to testing out the usage of this. The eqiad cluster is available from the analytics network, but the codfw cluster (elastic20{01..24}.codfw.wmnet) is not. Is it possible to open that one as well?

One other issue: The analytics servers can talk to elastic10{01..31}.eqiad.wmnet directly just fine, but I am seeing timeouts trying to talk to search.svc.eqiad.wmnet (the LVS endpoints).

Another option for analytics<->codfw that me and @Smalyshev just talked about would be using an nginx proxy on the elastic1*.eqiad.wmnet servers. There is already another ticket about setting up nginx on those instances (T124444). This nginx instance could inspect the host header and forward requests to codfw as necessary. It's a bit of a hack and i'm not super thrilled at it, but perhaps it's better than cross-DC ACL's ?

Another option for analytics<->codfw that me and @Smalyshev just talked about would be using an nginx proxy on the elastic1*.eqiad.wmnet servers. There is already another ticket about setting up nginx on those instances (T124444). This nginx instance could inspect the host header and forward requests to codfw as necessary. It's a bit of a hack and i'm not super thrilled at it, but perhaps it's better than cross-DC ACL's ?

That actually sounds worse. One way of doing it for eqiad and a different one for codfw. I 've talked about this with @faidon and @Joe. I am starting to think that, given that the entire way of analytics cluster talking straight to elasticsearch WILL be scraped in the future and the alternatives, it probably makes sense to just update the ACLs and hope to kill that approach as fast as possible.

I 'll update the ACL unless someone objects.

akosiaris closed this task as Resolved.Feb 3 2016, 1:06 PM

I 've updated the ACL for codfw as well. I 've tested it already and seems to work just fine. /me now waits on the replacement so this hack can be undone