[spark streaming kafka 0-10]spark2.1 Kafka0-10 createDirectStream api can only read one partition message when topic have some partitions

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[spark streaming kafka 0-10]spark2.1 Kafka0-10 createDirectStream api can only read one partition message when topic have some partitions

ffbin
This post has NOT been accepted by the mailing list yet.
spark2.1 Kafka0-10 createDirectStream api can only read one partition message when topic have some partitions.

  /**
   * Returns the latest (highest) available offsets, taking new partitions into account.
   */
  protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    paranoidPoll(c)
    val parts = c.assignment().asScala

    // make sure new partitions are reflected in currentOffsets
    val newPartitions = parts.diff(currentOffsets.keySet)
    // position for new partitions determined by auto.offset.reset if no commit
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
    // don't want to consume messages, so pause
    c.pause(newPartitions.asJava)
    // find latest available offsets

    /* here should add c.resume(currentOffsets.keySet.asJava) */
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }

when consumer use pause, then use seekToEnd() and position() can only get the last partition correct offset.we should use resume() before use seekToEnd() and position().
Loading...