Here is an example of events received in kafka on 2020-06-05 around 11:20am:
{"event_type": "purge", "tag2": 0, "as_src": 13335, "as_dst": 2914, "as_path": "2914", "peer_as_src": 0, "peer_as_dst": 2914, "peer_ip_src": "103.102.166.130", "ip_src": "162.159.64.7", "ip_dst": "116.51.26.210", "port_src": 0, "port_dst": 0, "country_ip_src": "", "country_ip_dst": "SG", "tcp_flags": "0", "ip_proto": "gre", "stamp_inserted": "2020-06-04 22:25:00", "stamp_updated": "2020-06-05 11:22:01", "packets": 126000, "bytes": 12626000, "writer_id": "default_kafka/30598"} {"event_type": "purge", "tag2": 0, "as_src": 13335, "as_dst": 2914, "as_path": "2914", "peer_as_src": 0, "peer_as_dst": 2914, "peer_ip_src": "103.102.166.130", "ip_src": "162.159.64.7", "ip_dst": "116.51.26.210", "port_src": 0, "port_dst": 0, "country_ip_src": "", "country_ip_dst": "SG", "tcp_flags": "0", "ip_proto": "gre", "stamp_inserted": "2020-06-04 22:25:00", "stamp_updated": "2020-06-05 11:23:01", "packets": 152000, "bytes": 13717000, "writer_id": "default_kafka/30652"}
We currently use the stamp_inserted field as ingestion-date for both Camus and Druid, and in that specific case this data is seen as belonging to day 2020-06-04 hour 22.
I can see examples of that pattern in other hours as well, for instance on day 2020-06-03 at hour 21
These late data are small but happen regularly: I computed the number of seconds difference between stamp_inserted and stamp_updated, and aggregated it using FLOOR(LOG10(diff)) per hour for the month of May 2020. This computation shows that hours containing at least one row where the difference is more than 10k seconds (more than 2hours and a half) are frequent (~10 per day).
2 solutions I can think of:
- Use stamp_updated instead of stamp_inserted as main ingestion timestamp
- Fix a limit for data late-acceptance (for instance, don't accept events having stamp_inserted later than X hours.