Page MenuHomePhabricator

Camus failing to import eqiad.mediawiki.(api|cirrussearch)-request from partitions leaders on kafka-jumbo1006
Closed, ResolvedPublic

Description

This is only happening for partitions that are leader on kafka-jumbo1006.

May  8 13:45:11 an-launcher1001 camus-mediawiki_analytics_events[25300]: The current offset is too close to the earliest offset, Camus might be falling behind: eqiad.mediawiki.api-request
uri:tcp://kafka-jumbo1006.eqiad.wmnet:9092
leader:1006
partition:2
earliest_offset:11344403152
offset:11388074562
latest_offset:11755636039
avg_msg_size:723
estimated_size:265746947871

May  8 13:45:11 an-launcher1001 camus-mediawiki_analytics_events[25300]: The current offset is too close to the earliest offset, Camus might be falling behind: eqiad.mediawiki.cirrussearch-request
uri:tcp://kafka-jumbo1006.eqiad.wmnet:9092
leader:1006
partition:6
earliest_offset:4392726283
offset:4411281201
latest_offset:4562931486
avg_msg_size:2389
estimated_size:362292530865


May  8 13:45:11 an-launcher1001 camus-mediawiki_analytics_events[25300]: 20/05/08 13:45:11 INFO kafka.CamusJob: eqiad.mediawiki.api-request
uri:tcp://kafka-jumbo1006.eqiad.wmnet:9092
leader:1006
partition:8
earliest_offset:11343888026
offset:11386686628
latest_offset:11754239979
avg_msg_size:721
estimated_size:265005966071

Event Timeline

Change 595179 had a related patch set uploaded (by Ottomata; owner: Ottomata):
[operations/puppet@production] Increase number of mappers used for Camus mediawiki_analtyics_events

https://gerrit.wikimedia.org/r/595179

Change 595179 merged by Ottomata:
[operations/puppet@production] Increase number of mappers used for Camus mediawiki_analtyics_events

https://gerrit.wikimedia.org/r/595179

Mentioned in SAL (#wikimedia-analytics) [2020-05-08T15:27:06Z] <ottomata> stopping kafka broker on kafka-jumbo1006 to investigate camus import failures - T252203

Mentioned in SAL (#wikimedia-operations) [2020-05-08T15:27:10Z] <ottomata> stopping kafka broker on kafka-jumbo1006 to investigate camus import failures - T252203

Mentioned in SAL (#wikimedia-operations) [2020-05-08T15:36:40Z] <ottomata> starting kafka broker on kafka-jumbo1006, same issue on other brokers when they are leaders of offending partitions - T252203

Mentioned in SAL (#wikimedia-analytics) [2020-05-08T15:36:44Z] <ottomata> starting kafka broker on kafka-jumbo1006, same issue on other brokers when they are leaders of offending partitions - T252203

No idea why this happens.

I did notice that after each run, since no data is written for these partitions, camus is not saving their offsets in any of the offsets-m-* files it creates. I'm not totally sure how it figures out the offset for the next run, but I think it must look back at a copy of the offsets-previous file, which does have these offsets in it.

This time I was able to work around it slightly less intrusively than in T233718 by writing new Camus offsets-m files into the latest camus history directory with incremented offsets for these 3 offending topic-partitions. In my tests, incrementing by 1 or 1000 didn't succeed (still no data written) but incrementing by 10000 did. I incremented all offsets for these 3 partitions by 10000 and let Camus run. It worked!

topic:eqiad.mediawiki.cirrussearch-request partition:6 beginOffset:4411291201 estimatedLastOffset:4569752572
2020-05-08 20:01:08,007 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 42908
2020-05-08 20:01:08,148 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat: Moving hdfs://analytics-hadoop/wmf/camus/mediawiki_analytics_events-01/2020-05-08-20-00-28/_temporary/1/_temporary/attempt_1583418280867_339632_m_000000_0/data.eqiad_mediawiki_cirrussearch-request.1005.6.1588402800000-m-00000 to hdfs://analytics-hadoop/wmf/data/raw/event/eqiad_mediawiki_cirrussearch-request/hourly/2020/05/02/07/eqiad_mediawiki_cirrussearch-request.1005.6.42908.4411334109.1588402800000

topic:eqiad.mediawiki.api-request partition:2 beginOffset:11388084562 estimatedLastOffset:11772610510
2020-05-08 20:29:33,949 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 9476002
2020-05-08 20:29:34,378 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat: Moving hdfs://analytics-hadoop/wmf/camus/mediawiki_analytics_events-01/2020-05-08-20-15-12/_temporary/1/_temporary/attempt_1583418280867_339678_m_000001_0/data.eqiad_mediawiki_api-request.1004.2.1588399200000-m-00001 to hdfs://analytics-hadoop/wmf/data/raw/event/eqiad_mediawiki_api-request/hourly/2020/05/02/06/eqiad_mediawiki_api-request.1004.2.116431.11388258914.1588399200000

topic:eqiad.mediawiki.api-request partition:8 beginOffset:11386696628 estimatedLastOffset:11771234140
2020-05-08 20:15:36,476 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 1203
2020-05-08 20:15:36,684 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlMultiOutputFormat: Moving hdfs://analytics-hadoop/wmf/camus/mediawiki_analytics_events-01/2020-05-08-20-15-12/_temporary/1/_temporary/attempt_1583418280867_339678_m_000000_0/data.eqiad_mediawiki_api-request.1003.8.1588399200000-m-00000 to hdfs://analytics-hadoop/wmf/data/raw/event/eqiad_mediawiki_api-request/hourly/2020/05/02/06/eqiad_mediawiki_api-request.1003.8.1203.11386697831.1588399200000

Note that in this log output, each of the beginOffset is 10000 more than what it is in the error messages noted in this task description.

Here's how I created the offset files (more or less):

import org.apache.hadoop.fs._
import org.apache.hadoop.conf.Configuration
import com.linkedin.camus.etl.kafka.common.EtlKey;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;

val configuration = new Configuration
val fs: FileSystem = FileSystem.get(configuration)



  /**
   * Helper to construct an EtlKey for a topic partition at a specific offset.
   */
  def createEtlKey(topic: String, partition: Int, offset: Long) = {
    var beginOffset = offset - 1L
    if (beginOffset < 0L) {
      beginOffset = offset
    }

    new EtlKey(
      topic, "", partition, beginOffset, offset
    )
  }

  /**
   * Helper to write Camus offsets SequenceFiles.
   */
  def writeOffsetsFile(offsetsFilePath: Path, etlKeys: Seq[EtlKey]) = {

    val offsetsWriter = SequenceFile.createWriter(
        fs, configuration, offsetsFilePath,
        classOf[EtlKey], classOf[NullWritable]
    )

    for (etlKey <- etlKeys) {
      offsetsWriter.append(etlKey, NullWritable.get());
    }
    offsetsWriter.close();
  }


val cirrusPart6EtlKey = createEtlKey("eqiad.mediawiki.cirrussearch-request", 6, 4411281201L + 10000L)
val apiPart2EtlKey = createEtlKey("eqiad.mediawiki.api-request", 2, 11388074562L + 10000L)
val apiPart8EtlKey = createEtlKey("eqiad.mediawiki.api-request", 8, 11386686628L + 10000L)

writeOffsetsFile(new Path("/user/otto/camus-T252203/camus-T252203.offsets.10000"), Seq(cirrusPart6EtlKey,apiPart2EtlKey,apiPart8EtlKey))

Then I copied that offsets file into the latest history directory for this camus job:

sudo -u analytics kerberos-run-command analytics hdfs dfs -cp /user/otto/camus-T252203/camus-T252203.offsets.10000 /wmf/camus/mediawiki_analytics_events-01/history/2020-05-08-20-00-28/offsets-m-00099

I'll now replay the 10000 messages I skipped in each of these topic partitions.

To replay:

kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 2 -o 11388074561 -c 10001 > eqiad.mediawiki.api-request.2.11388074561.10001
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 8 -o 11386686627 -c 10001 > eqiad.mediawiki.api-request.8.11386686628.10001
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.cirrussearch-request -p 6 -o 4411281200 -c 10001 > eqiad.mediawiki.cirrussearch-request.6.4411281200.10001

kafkacat -P -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 2 < eqiad.mediawiki.api-request.2.11388074561.10001
kafkacat -P -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 8 < eqiad.mediawiki.api-request.8.11386686628.10001
kafkacat -P -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.cirrussearch-request -p 6 < eqiad.mediawiki.cirrussearch-request.6.4411281200.10001

Camus will end up consuming these messages into HDFS at the end of the partitions. Once it is all caught up, I'll re-refine the entire last week of data for these tables.

Mentioned in SAL (#wikimedia-analytics) [2020-05-08T21:06:29Z] <ottomata> running prefered replica election for kafka-jumbo to get preferred leaders back after reboot of broker earlier today - T252203

Mentioned in SAL (#wikimedia-operations) [2020-05-08T21:06:32Z] <ottomata> running prefered replica election for kafka-jumbo to get preferred leaders back after reboot of broker earlier today - T252203

>:(

I spoke too soon. From a recent run 2020-05-08-20-45-15 application_1583418280867_339736:

topic:eqiad.mediawiki.api-request partition:8 beginOffset:11386697831 estimatedLastOffset:11772594249
2020-05-08 20:45:38,239 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Records read : 0

topic:eqiad.mediawiki.cirrussearch-request partition:6 beginOffset:4411334109 estimatedLastOffset:4570703624
2020-05-08 20:45:38,482 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Records read : 0

But api-request partition 2 is fine?

topic:eqiad.mediawiki.api-request partition:2 beginOffset:11406890542 estimatedLastOffset:11773966037
2020-05-08 20:59:38,387 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 9662677

Wha???

FINE. Let's try to skip at least an hour of data.


// api-requset is about 9000 / second and has 12 partitions.  Let's try to skip more than
// an hour of data, rounded up to a round number.
// 9000/12*60/*60 == about 2700000 messages per partition per hour.  Round up to 3000000
val increment = 3000000L

val apiPart8EtlKey = createEtlKey("eqiad.mediawiki.api-request", 8, 11386697831L + increment)
val cirrusPart6EtlKey = createEtlKey("eqiad.mediawiki.cirrussearch-request", 6, 4411334109L + increment)

writeOffsetsFile(new Path(s"/user/otto/camus-T252203/camus-T252203.offsets.${increment}"), Seq(cirrusPart6EtlKey,apiPart8EtlKey))

//sudo -u analytics kerberos-run-command analytics hdfs dfs -cp /user/otto/camus-T252203/camus-T252203.offsets.3000000 /wmf/camus/mediawiki_analytics_events-01/history/2020-05-08-21-15-09/offsets-m-00099

Run Camus and look at application logs:
application_1583418280867_339836

yarn-logs -u analytics application_1583418280867_339836 | grep -A 25 -E 'topic:eqiad.mediawiki.api-request partition:2|topic:eqiad.mediawiki.api-request partition:8|topic:eqiad.mediawiki.cirrussearch-request partition:6'


topic:eqiad.mediawiki.api-request partition:2 beginOffset:11436970152 estimatedLastOffset:11776099563
2020-05-08 21:49:39,612 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 10151568

topic:eqiad.mediawiki.api-request partition:8 beginOffset:11389697831 estimatedLastOffset:11774728941
2020-05-08 21:49:42,674 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 10713368

topic:eqiad.mediawiki.cirrussearch-request partition:6 beginOffset:4414334109 estimatedLastOffset:4571579786
2020-05-08 21:49:39,060 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 5641899

GOOD! Now stay that way! Will watch the next Camus run and make sure.

Gotta replay the skipped messages so they eventually get imported:

kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 8 -o 11386697830 -c 3000001 > eqiad.mediawiki.api-request.8.11386697830.3000001
kafkacat -C -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.cirrussearch-request -p 6 -o 4411334108 -c 3000001 > eqiad.mediawiki.cirrussearch-request.6.4411334108.3000001

kafkacat -P -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.api-request -p 8 < eqiad.mediawiki.api-request.8.11386697830.3000001
kafkacat -P -b kafka-jumbo1001.eqiad.wmnet:9092 -t eqiad.mediawiki.cirrussearch-request -p 6 < eqiad.mediawiki.cirrussearch-request.6.4411334108.3000001

Next run was still good:

yarn-logs -u analytics application_1583418280867_339909 | grep -A 25 -E 'topic:eqiad.mediawiki.api-request partition:2|topic:eqiad.mediawiki.api-request partition:8|topic:eqia

topic:eqiad.mediawiki.api-request partition:2 beginOffset:11447121720 estimatedLastOffset:11777137042
2020-05-08 22:14:54,340 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 10055757

topic:eqiad.mediawiki.api-request partition:8 beginOffset:11400411199 estimatedLastOffset:11778766539
2020-05-08 22:14:54,320 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 10141274

topic:eqiad.mediawiki.cirrussearch-request partition:6 beginOffset:4419976008 estimatedLastOffset:4574997529
2020-05-08 22:14:56,683 INFO [main] com.linkedin.camus.etl.kafka.mapred.EtlRecordReader: Num of records read for this partition = 5347405

I hope it stays this way!

I'll launch a Refine backfill either this weekend or Monday at the latest.

I still have no idea why this happens. Maybe there is some bug in the ancient Kafka client that Camus uses that causes it to get stuck or return an empty iterator sometimes? Luca wants to upgrade the client anyway to get TLS support. Perhaps we should just try that and cross our fingers.

Just launched a backfill for mediawiki_cirrussearch_request and mediawiki_api_request for any failed or modified raw data hours since May 1.

sudo -u analytics spark2-submit \
--name refine_event_backfill_T252203 \
--class org.wikimedia.analytics.refinery.job.refine.Refine \
--files /etc/hive/conf/hive-site.xml,/etc/refinery/refine/refine_event.properties,/srv/deployment/analytics/refinery/artifacts/hive-jdbc-1.1.0-cdh5.10.0.jar,/srv/deployment/analytics/refinery/artifacts/hive-service-1.1.0-cdh5.10.0.jar --master yarn --deploy-mode cluster --queue production --driver-memory 8G --executor-memory 4G --conf spark.driver.extraClassPath=/usr/lib/hadoop-mapreduce/hadoop-mapreduce-client-common.jar:hive-jdbc-1.1.0-cdh5.10.0.jar:hive-service-1.1.0-cdh5.10.0.jar --conf spark.dynamicAllocation.maxExecutors=64  \
--principal analytics/an-launcher1001.eqiad.wmnet@WIKIMEDIA --keytab /etc/security/keytabs/analytics/analytics.keytab \
/srv/deployment/analytics/refinery/artifacts/org/wikimedia/analytics/refinery/refinery-job-0.0.124.jar \
--config_file refine_event.properties --table_whitelist_regex='mediawiki_(cirrussearch|api)_request' --since='2020-05-01T00:00:00' --ignore_failure_flag=true
20/05/12 08:58:04 INFO Refine: Successfully refined 142 of 142 dataset partitions into table `event`.`mediawiki_cirrussearch_request` (total # refined records: 1737424574)
20/05/12 08:58:04 INFO Refine: Successfully refined 144 of 144 dataset partitions into table `event`.`mediawiki_api_request` (total # refined records: 4226747122)

In https://phabricator.wikimedia.org/T249261#6146372 @EBernhardson reported that he was missing event.mediawiki_cirrussearch_request/datacenter=eqiad/year=2020/month=5/day=8/hour=10. I just looked for this data in Camus raw, and indeed hour 10 is missing in raw data for that day. Once again, we've lost some data. I see the same hour missing for mediawiki_api_request too.

Erik, I'm really sorry about that. This is the second time that Camus has bit us hard here. I tried looking as deep as I could to figure out what is wrong, and the best I can do was the workaround. We've wanted to replace Camus for years, but been blocked by priorities and some licensing issues with Confluent's Kafka Connect HDFS. We've recently discussed expediting the replacement, but it still can't happen overnight.

The data loss here is Camus' fault, but it is really my fault that I didn't backfill this data properly. I got most of it, but clearly not all. I'm not sure why this hour was missed; I was pretty sure I had effectively backfilled this. I guess I failed at keeping all the partitions and offsets and sequence files and data straight on a late Friday afternoon when I was struggling with this.

We've got monitoring in place for:

  • Noticing if Camus is stuck (this is what made me file this ticket in the first place)
  • Noticing if Refine fails on raw Camus imported data
  • Noticing if Refine doesn't run on raw Camus imported data

However, noticing that a given hour of raw Camus imported data is missing is hard, mainly because there are many topics that actually have no data for some given hour. We have some thoughts on how to alert on missing hours in T250844: MEP: canary events so we know events are flowing through pipeline , but we haven't done that yet.

I still have those backups on hdfs, the cirrussearch one starts at 2020-05-02T06:58:54Z and ends at 2020-05-08T18:18:06Z so theoretically it should have that hour in there. It's on hdfs:

/user/milimetric/camus-stuck-on-P-6-O-4392726283.kafka

OH RIGHT! Great. Will check that out and try to backfill it.