Create a generic gRPC endpoint, pre-export, that does the following, by doing one pass of the compacted topic for a particular project-namespace:
- determine a map of partition://list of excluded offset// that the export must ignore while creating snapshot
- determine a map of partition://high offset// (when export should stop reading)
- save the list of unique articles and total number of unique articles included in a project-namespace for a certain day, for future debuggability
Model this gRPC endpoint similar to the export endpoint as follows:
service Snapshots {
rpc PreExport(PreExportRequest) returns (PreExportResponse);
rpc Export(ExportRequest) returns (ExportResponse);
}
//Input//
message PreExportRequest {
int32 namespace = 1;
string project = 3;
string prefix = 4;
int64 since = 6;
}Output
The pre-export output should be 2 things:
- a map of partition://list of excluded offset//
- a map of partition://high offset//
- number of unique articles
Apart from returning these, the service will also save unique article names and total number of unique articles that will be included in a snapshot for a particular project-namespace for a certain day in data bucket in s3 at dag-metadata/snapshots/yyyy-mm-dd/<snapshot identifier>.json, e.g., dag-metadata/snapshots/2025-05-28/enwiki_namespace_0.json
Example:
The endpoint is called with project enwiki, namespace 0, since 0. These are all the events in the relevant topic. Topic has 4 partitions.
(article_A, version: 100081, event.type: “update”, partition: 1, offset: 100),
(article_A, version: 100089, event.type: “update”, partition: 1, offset: 110),
(article_A, version: 100099, event.type: “update”, partition: 1, offset: 105)
(article_B, version: 100099, event.type: “update”, partition: 2, offset: 205),
(article_B, version: 100099, event.type: “update”, partition: 2, offset: 105),
(article_B, version: 100099, event.type: “update”, partition: 2, offset: 107)
(article_C, version: 100049, event.type: “update”, partition: 3, offset: 105),
(article_C, version: 100079, event.type: “update”, partition: 3, offset: 106),
(article_C, version: 100099, event.type: “delete”, partition: 3, offset: 108)
(article_D, version: 100049, event.type: “update”, partition: 4, offset: 105),
(article_E, version: 100049, event.type: “delete”, partition: 4, offset: 205),
(article_E, version: 100069, event.type: “update”, partition: 4, offset: 210),
Here, article_A and article_B are duplicated. article_C is eventually deleted (max version is delete). article_E is not deleted. Note: higher version may not be at higher offset.
For this example, the grpc service must return:
- a map of partition://list of excluded offset//
{1: [100, 110], 2: [105, 107], 3: [105, 106]}
- a map of partition://high offset//
{1:110, 2:205, 3:108, 4:210}
- 4
Additionally, the service will save the following at dag-metadata/snapshots/yyyy-mm-dd/enwiki_namespace_0.json
{ "total" : 4,
"article_names" : [article_A, article_B, article_D, article_E]
}To do
- Create the grpc proto and implement the endpoint handler (in snapshot repo) with above specification
- Test this gRPC endpoint locally
Acceptance criteria
- gRPC endpoint should return the output and save metadata to s3 (minio) as per specification