spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabor Somogyi <gabor.g.somo...@gmail.com>
Subject Re: [Spark Streaming Kafka 0-10] - What was the reason for adding "spark-executor-" prefix to group id in executor configurations
Date Thu, 05 Sep 2019 16:13:31 GMT
Hi,

Let me share Spark 3.0 documentation part (Structured Streaming and not
DStreams what you've mentioned but still relevant):

kafka.group.id string none streaming and batch The Kafka group id to use in
Kafka consumer while reading from Kafka. Use this with caution. By default,
each query generates a unique group id for reading data. This ensures that
each Kafka source has its own consumer group that does not face
interference from any other consumer, and therefore can read all of the
partitions of its subscribed topics. In some scenarios (for example, Kafka
group-based authorization), you may want to use a specific authorized group
id to read data. You can optionally set the group id. However, do this with
extreme caution as it can cause unexpected behavior. Concurrently running
queries (both, batch and streaming) or sources with the same group id are
likely interfere with each other causing each query to read only part of
the data. This may also occur when queries are started/restarted in quick
succession. To minimize such issues, set the Kafka consumer session timeout
(by setting option "kafka.session.timeout.ms") to be very small. When this
is set, option "groupIdPrefix" will be ignored.
I think it answers your questions.

As a general suggestion maybe it worth to revisit Spark 3.0 because
Structured Streaming has another interesting feature:
groupIdPrefix string spark-kafka-source streaming and batch Prefix of
consumer group identifiers (`group.id`) that are generated by structured
streaming queries. If "kafka.group.id" is set, this option will be ignored.

BR,
G


On Thu, Sep 5, 2019 at 10:05 AM Sethupathi T
<sethu.sdm@googlemail.com.invalid> wrote:

> Hi Team,
>
> We have secured Kafka cluster (which only allows to consume from the
> pre-configured, authorized consumer group), there is a scenario where we
> want to use spark streaming to consume from secured kafka. so we have
> decided to use spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html> (it
> supports SSL/TSL, Direct DStream, new Kafka consumer API, etc) api. When i
> deploy the application in cluster mode, i realized that the actual group id
> has been prefixed with "spark-executor" in executor configuration (executor
> as trying to connect to kafka with "spark-executor" + actual group id,
> which is not really exists and getting exception).
>
> *Here is the code where executor construct executor specific group id *
>
>
> https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
line
> # 212,
>
> *Here are my Questions*
>
> #1 What was the specific reason for prefixing group id in executor ?
>
> #2 Will it be harmful if i build the custom spark-streaming-kafka-0-10
> <https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
> library by removing the group id prefix?
>
> #3 KafkaUtils.scala is marked as @Experimental what does it mean? is it
> advisable to use in production?
>
> *Here is the my spark streaming code snippet*
>
> val kafkaParams = Map[String, Object](
>
>   ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> Constants.BOOTSTRAP_SERVERS,
>   ConsumerConfig.GROUP_ID_CONFIG -> subscribers,
>   ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
>   ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
>   ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
>   ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[MessageDeserializer],
>   "security.protocol" -> "SSL",
>   "ssl.truststore.location" -> Constants.TRUSTSTORE_PATH,
>   "ssl.truststore.password" -> Constants.TRUSTSTORE_PASSWORD,
>   "ssl.keystore.location" -> Constants.KEYSTORE_PATH,
>   "ssl.keystore.password" -> Constants.KEYSTORE_PASSWORD,
>   "ssl.key.password" -> Constants.KEYSTORE_PASSWORD
> )
>
> val stream = KafkaUtils.createDirectStream[String, Message](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, Message](topicsSet, kafkaParams)
> )
>
> ---
> Thanks in Advance,
> Sethupathi.T
>

Mime
View raw message