spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh Mishra <sd.hri...@gmail.com>
Subject Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
Date Tue, 03 Mar 2020 16:35:40 GMT
Thanks Zhang.

You are right. The driver is committing on Kafka that's why single consumer
IP is coming on Kafka manager. Actually, in one spark context we are
starting multiple Kafka steam, but Driver is executing them sequentially,
not in parallel. While debugging this, I found this issue and suspected
that everything is happening in the driver. But now it clear, even I
enabled debug log on executors where KafkaRDD was fetching events from
Kafka for given offsets.


Second thing, where can I get some insight that why all different Kafka
streams of a Spark context are being executed sequentially. I found
*spark.streaming.concurrentJob *config to run job parallel but I read on stack
overflow
<https://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming?answertab=active#tab-top>that
it has some adverse effect.






On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <zhangshuai_work@outlook.com>
wrote:

> Hi Hrishi.
>
> I guess your code is similar to the following。
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> The action of submitting the offset occurs on the driver side。
>
> Spark calculates the records that should be consumed by each topic
> partition in the current batch on the driver side, and then the tasks on
> each executor actually consume the corresponding partitions.
>
> You can check if the ip is the node address where the driver is located。
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd.hrishi@gmail.com>
> *发送时间:* 2020年2月29日 12:05
> *收件人:* user@spark.apache.org <user@spark.apache.org>
> *主题:* In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
>
> I have created one sample Direct Kafka Stream in Spark. Kafka has 30
> partitions of given topic. But all consumers are executing from same
> executor machine.
>
> Kafka Manager screenshot.
> [image: Screenshot 2020-02-28 at 7.06.49 PM 2.png]
>
> As per my understanding in direct Kafka Stream, Drive gives the offsets to
> executors and polls with this.
>
> Kafka Stream
>
>         HashMap<String, Object> kafkaParams = new HashMap<>();
>         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
>         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
>         kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
>         kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
>
>         KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic),
kafkaParams));
>
> Spark Version: 2.4
>
> Spark Config
>
>         SparkConf conf = new SparkConf().setAppName("StreamingTest");
>         conf.set("spark.shuffle.service.enabled", "true");
>         conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
>         conf.set("spark.streaming.backpressure.enabled", "true");
>         conf.set("spark.streaming.concurrentJobs", "1");
>         conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
>         conf.set("spark.streaming.backpressure.pid.minRate", "1500");
>
>
>
> Regards,
>
> Hrishi
>
>

Mime
View raw message