Page MenuHomePhabricator

Allow parallel consumption on Realtime API
Closed, ResolvedPublic13 Estimated Story Points

Description

We need to introduce partition level consumption to allow data re-users parallelized consumption of streams, that will allow more efficient scaling.

Acceptance criteria
Ability to consume and resume consumption by partition in Realtime API.

To-Do

  • Schema updates
    • date_published - equals to ROWTIME in ksqlDB
    • offset - equals to ksqlDB ROWOFFSET
    • partition - equals ROWPARTITION in ksqlDB
{
 "name": "Albert Einstein",
 ...,
 "event": {
  "identifier": "a9f6d391-c216-48d6-986b-d4763f077fbd",
  "type": "update",
  "date_created": "2021-08-31T04:51:39Z",
  "date_published": "2021-08-31T04:52:39Z",
  "offset": 223123,
  "partition": 100
 } 
}
  • query that will be supported with following arguments:
    • parts - number from 0 to 9, representing the number of parallel connections can be made per query
    • offsets - simple map that will show the latest consumed offset and map it to partition (map[int]int - where the key is partition value collected from event.partition field and the value is event.offset)
    • since - similar to offsets, but the key will be event.date_published instead
curl -X 'POST' \
  'https://realtime.enterprise.wikimedia.com/v2/articles' \
  -H 'accept: text/event-stream' \
  -H 'Content-Type: application/json' \
  -d '{
  "parts": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  "offsets": {
    "55": 200,
    "48": 1232
  },
  "since": {
    "55": 123123123123123,
    "88": 123123213213211
  },
  "fields": [
    "name",
    "identifier"
  ],
  "filters": [
    {
      "field": "in_language.identifier",
      "value": "en"
    }
  ]
}'

Notes

Here’s an example of the query that’s going to extract the data described above from the stream:

SELECT 
  NAME,
  ROWTIME,
  ROWPARTITION,
  ROWOFFSET
FROM rlt_articles_str
WHERE ROWPARTITION in (0, 1)
  and case
    when ROWPARTITION = 0 then ROWOFFSET >= 2135270
    else ROWOFFSET >= 2117757
  end EMIT CHANGES
LIMIT 10000;

The query described will start consuming from the offset provided in the query, also going to split the offsets by partition.

Event Timeline

Protsack.stephan created this task.
prabhat changed the task status from Open to In Progress.Jun 5 2023, 11:45 PM
prabhat moved this task from Estimated /Discussed to In Progress on the Wikimedia Enterprise board.

QA tests: I had "context cancelled" from some of my calls. This could be a problem on my ISP connection to the US AWS servers. It's best to re-run this QA step with someone else to see if it's a one-off on my side.

QA was done by observing the that messages received come from the correct partitions on the kafka topics in dev. When offset is chosen it was observed that no messages with offsets less than the chosen are received. offset and since_per_partition cannot both be used for the same partition. max_parts cannot be exceeded. When using since_per_partition no messages with datetime less than the one picked are received. Length of offset and since_per_partition cannot exceed the max number of partitions. When some offsets are picked for some partitions and not others, messages for the partitions without offsets are received normally. Ticket can be marked as done.

prabhat updated the task description. (Show Details)
prabhat updated the task description. (Show Details)