We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10
(it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception).
Here is the code where executor construct executor specific group id
Here are my Questions
#1 What was the specific reason for prefixing group id in executor ?
#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?
Here is the my spark streaming code snippet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
val stream = KafkaUtils.createDirectStream[String, Message](
Subscribe[String, Message](topicsSet, kafkaParams)
Thanks in Advance,