SPARK-19547

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

SPARK-19547

Rastogi, Pankaj
Hi,
 I have been trying to distribute Kafka topics among different instances of same consumer group. I am using KafkaDirectStream API for creating DStreams. After the second consumer group comes up, Kafka does partition rebalance and then Spark driver of the first consumer dies with the following exception:

java.lang.IllegalStateException: No current assignment for partition myTopic_5-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I see that there is Spark ticket opened with the same issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been marked as INVALID. Can someone explain why this ticket is marked INVALID.

Thanks,
Pankaj
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: SPARK-19547

Cody Koeninger-2
Can you explain in more detail what you mean by "distribute Kafka
topics among different instances of same consumer group"?

If you're trying to run multiple streams using the same consumer
group, it's already documented that you shouldn't do that.

On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
<[hidden email]> wrote:

> Hi,
>  I have been trying to distribute Kafka topics among different instances of
> same consumer group. I am using KafkaDirectStream API for creating DStreams.
> After the second consumer group comes up, Kafka does partition rebalance and
> then Spark driver of the first consumer dies with the following exception:
>
> java.lang.IllegalStateException: No current assignment for partition
> myTopic_5-0
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:264)
> at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:336)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1236)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> I see that there is Spark ticket opened with the same
> issue(https://issues.apache.org/jira/browse/SPARK-19547) but it has been
> marked as INVALID. Can someone explain why this ticket is marked INVALID.
>
> Thanks,
> Pankaj

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [E] Re: SPARK-19547

Rastogi, Pankaj
Hi,
 Thank you for your reply!
 You got it right! I am trying to run multiple streams using the same
consumer, so that I can distribute different partitions among different
instances of the consumer group. I don¹t want to provide the list of
partitions in createDirectStream API. If I do that then it will become
difficult to handle consumer failure as those partitions won¹t be ready by
any consumer. Also I will have to handle addition of new partitions.

I wanted to see if I can use partition rebalance feature.

Pankaj

On 6/8/17, 8:24 AM, "Cody Koeninger" <[hidden email]> wrote:

>Can you explain in more detail what you mean by "distribute Kafka
>topics among different instances of same consumer group"?
>
>If you're trying to run multiple streams using the same consumer
>group, it's already documented that you shouldn't do that.
>
>On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
><[hidden email]> wrote:
>> Hi,
>>  I have been trying to distribute Kafka topics among different
>>instances of
>> same consumer group. I am using KafkaDirectStream API for creating
>>DStreams.
>> After the second consumer group comes up, Kafka does partition
>>rebalance and
>> then Spark driver of the first consumer dies with the following
>>exception:
>>
>> java.lang.IllegalStateException: No current assignment for partition
>> myTopic_5-0
>> at
>>
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta
>>te(SubscriptionState.java:264)
>> at
>>
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR
>>eset(SubscriptionState.java:336)
>> at
>>
>>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j
>>ava:1236)
>> at
>>
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets
>>(DirectKafkaInputDStream.scala:197)
>> at
>>
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc
>>tKafkaInputDStream.scala:214)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed
>>DStream.scala:42)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP
>>roperties(TransformedDStream.scala:65)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>>
>>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr
>>eam.scala:48)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:117)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:116)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>>
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1
>>16)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:249)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:247)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat
>>or.scala:247)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream
>>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:89)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I see that there is Spark ticket opened with the same
>>
>>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.
>>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp
>>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa
>>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL
>>OYw&e= ) but it has been
>> marked as INVALID. Can someone explain why this ticket is marked
>>INVALID.
>>
>> Thanks,
>> Pankaj
>
>---------------------------------------------------------------------
>To unsubscribe e-mail: [hidden email]
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [E] Re: SPARK-19547

Sree V
Hi Pankaj,

>> After the second consumer group comes up
Do you mean a second consumer starts with the same consumer group as the first ?

createDirectStream is overloaded. One of the method, doesn't need you to specify partitions of a topic.

Cheers
- Sree


On Thursday, June 8, 2017 9:56 AM, "Rastogi, Pankaj" <[hidden email]> wrote:


Hi,
Thank you for your reply!
You got it right! I am trying to run multiple streams using the same
consumer, so that I can distribute different partitions among different
instances of the consumer group. I don¹t want to provide the list of
partitions in createDirectStream API. If I do that then it will become
difficult to handle consumer failure as those partitions won¹t be ready by
any consumer. Also I will have to handle addition of new partitions.

I wanted to see if I can use partition rebalance feature.

Pankaj

On 6/8/17, 8:24 AM, "Cody Koeninger" <[hidden email]> wrote:

>Can you explain in more detail what you mean by "distribute Kafka
>topics among different instances of same consumer group"?
>
>If you're trying to run multiple streams using the same consumer
>group, it's already documented that you shouldn't do that.
>
>On Thu, Jun 8, 2017 at 12:43 AM, Rastogi, Pankaj
><[hidden email]> wrote:
>> Hi,
>>  I have been trying to distribute Kafka topics among different
>>instances of
>> same consumer group. I am using KafkaDirectStream API for creating
>>DStreams.
>> After the second consumer group comes up, Kafka does partition
>>rebalance and
>> then Spark driver of the first consumer dies with the following
>>exception:
>>
>> java.lang.IllegalStateException: No current assignment for partition
>> myTopic_5-0
>> at
>>
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedSta
>>te(SubscriptionState.java:264)
>> at
>>
>>org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetR
>>eset(SubscriptionState.java:336)
>> at
>>
>>org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.j
>>ava:1236)
>> at
>>
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets
>>(DirectKafkaInputDStream.scala:197)
>> at
>>
>>org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(Direc
>>tKafkaInputDStream.scala:214)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(Tr
>>ansformedDStream.scala:42)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream.compute(Transformed
>>DStream.scala:42)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonf
>>un$1.apply(DStream.scala:340)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(D
>>Stream.scala:415)
>> at
>>
>>org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalP
>>roperties(TransformedDStream.scala:65)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:335)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(
>>DStream.scala:333)
>> at scala.Option.orElse(Option.scala:257)
>> at
>>
>>org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330
>>)
>> at
>>
>>org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStr
>>eam.scala:48)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:117)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.sca
>>la:116)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>>
>>scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike
>>.scala:251)
>> at
>>
>>scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>a:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>>scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>>
>>org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:1
>>16)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:249)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGen
>>erator.scala:247)
>> at scala.util.Try$.apply(Try.scala:161)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerat
>>or.scala:247)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$stream
>>ing$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:89)
>> at
>>
>>org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGe
>>nerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> I see that there is Spark ticket opened with the same
>>
>>issue(https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.
>>org_jira_browse_SPARK-2D19547&d=DwIBaQ&c=udBTRvFvXC5Dhqg7UHpJlPps3mZ3LRxp
>>b6__0PomBTQ&r=zQqmwCNxd6rBWnFRMGXIzVL1nRVw40AD5ViBUj89NkA&m=z6Y6ytitXzXsa
>>VNguwUlEw9bqH1xFzdB41wcJAbNex4&s=wKLdTZtkzJCT8c4egqXfosrZ3KJAC0rNSZG_DPAL
>>OYw&e= ) but it has been
>> marked as INVALID. Can someone explain why this ticket is marked
>>INVALID.
>>
>> Thanks,
>> Pankaj
>
>---------------------------------------------------------------------
>To unsubscribe e-mail: [hidden email]

>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Loading...