Hi Mich, Thanks a lot for your response. I am basically trying to get some older code(streaming job to read from kafka) in 2.0.1 spark to work in 3.0,1. The specific area where I am having problem (KafkaCluster) has most likely to do with get/ set commit offsets in kafka



// Create message Dstream for each (topic, schema class)                              

    val msgStreams = config.getTopicSchemaClassMap.map {                                      

      case (kafkaTopic, schemaClass) => {                                                     

        val consumerOffsets = getConsumerOffsets(kafkaTopic)                                  

        val msgDStream = (KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder,

          Tuple2[Array[Byte],Array[Byte]]]                                                    

          (ssc, kafkaParams, consumerOffsets,                                                 

          (msg: MessageAndMetadata[Array[Byte], Array[Byte]]) => (msg.key, msg.message)       

          ))                                                                                  

        (kafkaTopic, schemaClass, msgDStream)                                                 

      }                                                                                       

    }  




The getConsumerOffsets  method  internally used KafkaCluter which is probably deprecated.



Do You think I need to mimic the code shown here to get/set offsets rather than use kafkaCluster?


https://spark.apache.org/docs/3.0.0-preview/streaming-kafka-0-10-integration.html


Thanks

Kiran


On Mon, Jun 7, 2021 at 1:04 AM Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

Hi,

Are you trying to read topics from Kafka in spark 3.0.1?

Have you checked Spark 3.0.1 documentation?

Integrating Spark with Kafka is pretty straight forward. with 3.0.1 and higher


HTH


   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 6 Jun 2021 at 21:18, Kiran Biswal <biswalkiran@gmail.com> wrote:
I am using spark 3.0.1 AND Kafka 0.10 AND Scala 2.12. Getting an error related to KafkaCluster (not found: type KafkaCluster). Is this class deprecated? How do I find a replacement?

I am upgrading from spark 2.0.1 to spark 3.0.1

In spark 2.0.1 KafkaCluster was supported

just looking for ideas how to achieve same functionality in spark 3.0.1. Any thoughts and examples will be highly appreciated.

Thanks
Kiran