Page MenuHomePhabricator

[SPIKE] Build simple stateless service using Flink SQL
Closed, ResolvedPublicSpike

Description

User Story
As a platform engineer, I need to try and build a simple stateless service that takes an input stream, transforms, enriches and produces an output using Flink SQL

The service should:

  • Listen to mediawiki.revision-create or another existing Kafka topic
  • Make a call to MW Action API
  • Produce some output that combines the data
Why?
  • We need to assess if this is a good abstraction for event driven data producers to create similar services easily
Done is:
  • Ticket contains write up of the process (links to repos)
  • Ticket contains the pros and cons of using Flink SQL so that the team can make a decision on how to proceed

Event Timeline

Restricted Application added a subscriber: Aklapper. · View Herald Transcript
Restricted Application changed the subtype of this task from "Task" to "Spike". · View Herald TranscriptSep 28 2022, 7:10 PM

Flink has an interface that implements Loookup Join semantics. I managed to implement it in a connector to asynchronously query http endpoints.

Some response content (e.g. Action API) might violate join semantics (it might not contain a required key). To work around this I introduced a required response field in an http table that stores json payload. It can then be parsed with Flink's json functions.

A table can be defined as:

CREATE TABLE MwAction(                                                                                                      
    revids integer, -- this field will be part of the query string (revids=<val>). It's name should match the action api param    
    domain string, -- special field (not part of action api) that we need to set in the request Host header when hitting internal endpoints.           
    response string -- special field (not part of action api) that will store the action api response content.              
) WITH (                                                                                                                    
    'connector' = 'mediawiki-http',                                                                                         
    'format' = 'json', -- TODO: we don't need really this, since internally we don't need to serialize data as Json.        
    'url' = 'https://en.wikipedia.org/w/api.php?action=query&format=json&prop=revisions&formatversion=2&rvprop=content&rvslots=main', -- the base url to query. Can contain defaults query string params. Query parameters could also be declared as table columns, and used in where clauses (... where rvslots = 'main');            
    -- 'fields.domain' = 'domain', -- Optional field that stores the domain (default: `domain`). When present, it will      
    -- be used to set the Host request header.                                                                              
    'fields.response' = 'response', -- Field stores the response payload (default: `response`).                             
    'http.client.header.user-agent' = 'wmf-mediawiki-stream-enrichment/1.0-SNAPSHOT bot'                                    
);

An enrichment query that needs to fetch wikitext from Action would look like this:

SELECT                                                                                                                  
    mw_action.revids,                                                                                                   
    mw_action.domain,                                                                                                   
    JSON_VALUE(mw_action.response, '$.query.pages[0].revisions[0].slots.main.content') as wikitext -- parse response content and extract wikitext.
FROM MockRevisionCreate AS revision_create JOIN MwAction FOR SYSTEM_TIME AS OF revision_create.proc_time AS mw_action   
ON revision_create.rev_id = mw_action.revids                                                                            
where mw_action.domain = 'en.wikipedia.org'; -- this should be a field we join on, extracted from `meta`.

Sample output:

+----+-------------+--------------------------------+--------------------------------+
| op |      revids |                         domain |                       wikitext |
+----+-------------+--------------------------------+--------------------------------+
| +I |        1000 |               en.wikipedia.org | <table align=center><tr val... |
| +I |        1001 |               en.wikipedia.org |                         <NULL> |
| +I |        1002 |               en.wikipedia.org | I teach math at Metropolita... |
| +I |        1003 |               en.wikipedia.org | '''Richard Nixon''' was the... |
| +I |        1004 |               en.wikipedia.org | '''Apoptosis''' describes t... |
| +I |        1005 |               en.wikipedia.org | The city of [[Tallinn]] is ... |
+----+-------------+--------------------------------+--------------------------------+

A worked out example can be found at https://gitlab.wikimedia.org/-/snippets/37. I'll follow up with an evaluation of this approach on wiki.

Apologies. I wrote docs before actually pushing code. It's now available in the linked repo. Happy to give you a walkthrough if you'd like.

Looks great, so cool that it works. Today in standup we moved this to Done, hope that's okay with you.