implict ClassTag in KafkaUtils

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

implict ClassTag in KafkaUtils

invkrh
Hi, 

I am reading spark streaming Kafka code.

In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to create classTag.
However, they are all implicit. I don't understand why they are implicit.

In fact, I can not find any other overloaded "createDirectStream" take implicit parameters.

So what are these implicit ClassTags are used for ? Thank you.

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
recordClass: Class[R],
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
cleanedHandler
)
}

--
Hao Ren

Data Engineer @ leboncoin

Paris, France
Reply | Threaded
Open this post in threaded view
|

Re: implict ClassTag in KafkaUtils

Saisai Shao
Actually this is a Scala problem. createDirectStream actually requires implicit values, which is implied as context bound, Java does not have the equivalence, so here change the java class to the ClassTag, and make it as implicit value, it will be used by createDirectStream.


Thanks
Saisai


On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <[hidden email]> wrote:
Hi, 

I am reading spark streaming Kafka code.

In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to create classTag.
However, they are all implicit. I don't understand why they are implicit.

In fact, I can not find any other overloaded "createDirectStream" take implicit parameters.

So what are these implicit ClassTags are used for ? Thank you.

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
recordClass: Class[R],
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
cleanedHandler
)
}

--
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply | Threaded
Open this post in threaded view
|

Re: implict ClassTag in KafkaUtils

invkrh
Thank you for your quick answer.
It helped me to find an implicit conversion for JavaInputDStream which takes implicit ClassTag.

Cheers.

On Thu, Dec 17, 2015 at 3:11 PM, Saisai Shao <[hidden email]> wrote:
Actually this is a Scala problem. createDirectStream actually requires implicit values, which is implied as context bound, Java does not have the equivalence, so here change the java class to the ClassTag, and make it as implicit value, it will be used by createDirectStream.


Thanks
Saisai


On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <[hidden email]> wrote:
Hi, 

I am reading spark streaming Kafka code.

In org.apache.spark.streaming.kafka.KafkaUtils file,
the function "createDirectStream" takes key class, value class, etc to create classTag.
However, they are all implicit. I don't understand why they are implicit.

In fact, I can not find any other overloaded "createDirectStream" take implicit parameters.

So what are these implicit ClassTags are used for ? Thank you.

def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
jssc: JavaStreamingContext,
keyClass: Class[K],
valueClass: Class[V],
keyDecoderClass: Class[KD],
valueDecoderClass: Class[VD],
recordClass: Class[R],
kafkaParams: JMap[String, String],
fromOffsets: JMap[TopicAndPartition, JLong],
messageHandler: JFunction[MessageAndMetadata[K, V], R]
): JavaInputDStream[R] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
createDirectStream[K, V, KD, VD, R](
jssc.ssc,
Map(kafkaParams.toSeq: _*),
Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
cleanedHandler
)
}

--
Hao Ren

Data Engineer @ leboncoin

Paris, France




--
Hao Ren

Data Engineer @ leboncoin

Paris, France