Hi Team,

We have secured Kafka cluster (which only allows to consume from the pre-configured, authorized consumer group), there is a scenario where we want to use spark streaming to consume from secured kafka. so we have decided to use spark-streaming-kafka-0-10 (it supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i deploy the application in cluster mode, i realized that the actual group id has been prefixed with "spark-executor" in executor configuration (executor as trying to connect to kafka with "spark-executor" + actual group id, which is not really exists and getting exception). 

Here is the code where executor construct executor specific group id 

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala line # 212,

Here are my Questions

#1 What was the specific reason for prefixing group id in executor ?

#2 Will it be harmful if i build the custom spark-streaming-kafka-0-10  library by removing the group id prefix?

#3 KafkaUtils.scala is marked as @Experimental what does it mean? is it advisable to use in production?

Here is the my spark streaming code snippet

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
"security.protocol" -> "SSL",
"ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
"ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
"ssl.keystore.location" -> Constants.KEYSTORE_PATH,
"ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
"ssl.key.password" -> Constants.KEYSTORE_PASSWORD
)

val stream = KafkaUtils.createDirectStream[String, Message](
ssc,
PreferConsistent,
Subscribe[String, Message](topicsSet, kafkaParams)
)
---
Thanks in Advance,
Sethupathi.T