spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shao, Saisai" <>
Subject RE: Setting Kafka parameters in Spark Streaming
Date Tue, 09 Sep 2014 02:51:34 GMT
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using
Java code with this API, this bug is fixed in latest version, as you can see the patch (
But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code
according to this patch and rebuild. Still highly recommend to use latest version of Spark
and Kafka, there are lots of improvements in streaming field.


From: Hemanth Yamijala []
Sent: Tuesday, September 09, 2014 12:49 AM
Subject: Setting Kafka parameters in Spark Streaming


I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes
when creating the Kafka DStream. The only API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D],
kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder
    at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
    at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
View raw message