spark streaming direct receiver offset initialization

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

spark streaming direct receiver offset initialization


In the class CachedKafkaConsumer.scala


what is the purpose of the following condition check in the method get(offset: Long, timeout: Long): ConsumerRecord[K, V]


assert(record.offset == offset,

s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset")


I have a production spark streaming job which after having worked for awhile (consumed kafka messages and updated/recorded offsets in kafka using = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
and dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) ) on restart during the first attempt to resume message consumption, seems to be hitting the above assertion


What is the purpose of that Assertion – i.e. what System Conditions related to e.g. the operation and interactions between Message Brokers and Message Consumers is it supposed to detect? The assertion is only available in the Spark Streaming Direct Consumer lib class and seems to be comparing the value of the Offset provided to Kafka to start reading from with the Offset of the message record returned by it (ie the Offset which is available as a field in the Record itself)


For example something like the following ie Consumer Offset misalignment after Leader Failure and subsequent Leader Election?

The last important Kafka cluster configuration property is unclean.leader.election.enable. It should be disabled (by default it is enabled) to avoid unrecoverable exceptions from Kafka consumer. Consider the situation when the latest committed offset is N, but after leader failure, the latest offset on the new leader is M < N. M < N because the new leader was elected from the lagging follower (not in-sync replica). When the streaming engine ask for data from offset N using Kafka consumer, it will get an exception because the offset N does not exist yet. Someone will have to fix offsets manually.

So the minimal recommended Kafka setup for reliable message processing is:

  • 4 nodes in the cluster
  • unclean.leader.election.enable=false in the brokers configuration
  • replication factor for the topics – 3
  • min.insync.replicas=2 property in topic configuration
  • ack=all property in the producer configuration
  • block.on.buffer.full=true property in the producer configuration

With the above setup your configuration should be resistant to single broker failure, and Kafka consumers will survive new leader election.

You could also take look at replica.lag.max.messages and properties for tuning when the follower is removed from ISR by the leader. But this is out of this blog post scope.