spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh Mishra <sd.hri...@gmail.com>
Subject Re: In Spark Streaming, Direct Kafak Consumers are not evenly distrubuted across executors
Date Fri, 06 Mar 2020 03:57:18 GMT
Thanks Zhang,

I know *spark.streaming.concurrentJobs *provides parallelism but It has
adverse affect as mentioned here https://stackoverflow.com/a/23533736. I
don't know is still valid with Spark 2.4.




On Fri, Mar 6, 2020 at 7:59 AM Zhang Victor <zhangshuai_work@outlook.com>
wrote:

> Hi Hrishi.
>
> I tested using multiple Kafka Streams.
>
> When the *number of executor * cores is greater than the number of topic
> partitions* and spark.streaming.concurrentJob> 1, it is possible to
> execute jobs concurrently.
>
> For example, stream1 -> topicA with 1 partitions and stream2 -> topicB
> with 2 partitions.
>
> And set spark.streaming.concurrentJob=2.
>
>
>
>
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd.hrishi@gmail.com>
> *发送时间:* 2020年3月4日 0:35
> *收件人:* Zhang Victor <zhangshuai_work@outlook.com>
> *抄送:* user@spark.apache.org <user@spark.apache.org>
> *主题:* Re: In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
> Thanks Zhang.
>
> You are right. The driver is committing on Kafka that's why single
> consumer IP is coming on Kafka manager. Actually, in one spark context we
> are starting multiple Kafka steam, but Driver is executing them
> sequentially, not in parallel. While debugging this, I found this issue and
> suspected that everything is happening in the driver. But now it clear,
> even I enabled debug log on executors where KafkaRDD was fetching events
> from Kafka for given offsets.
>
>
> Second thing, where can I get some insight that why all different Kafka
> streams of a Spark context are being executed sequentially. I found
> *spark.streaming.concurrentJob *config to run job parallel but I read on stack
> overflow
> <https://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming?answertab=active#tab-top>that
> it has some adverse effect.
>
>
>
>
>
>
> On Tue, Mar 3, 2020 at 8:18 AM Zhang Victor <zhangshuai_work@outlook.com>
> wrote:
>
> Hi Hrishi.
>
> I guess your code is similar to the following。
>
> stream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>   // some time later, after outputs have completed
>   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}
>
>
> The action of submitting the offset occurs on the driver side。
>
> Spark calculates the records that should be consumed by each topic
> partition in the current batch on the driver side, and then the tasks on
> each executor actually consume the corresponding partitions.
>
> You can check if the ip is the node address where the driver is located。
>
> ------------------------------
> *发件人:* Hrishikesh Mishra <sd.hrishi@gmail.com>
> *发送时间:* 2020年2月29日 12:05
> *收件人:* user@spark.apache.org <user@spark.apache.org>
> *主题:* In Spark Streaming, Direct Kafak Consumers are not evenly
> distrubuted across executors
>
>
> I have created one sample Direct Kafka Stream in Spark. Kafka has 30
> partitions of given topic. But all consumers are executing from same
> executor machine.
>
> Kafka Manager screenshot.
> [image: Screenshot 2020-02-28 at 7.06.49 PM 2.png]
>
> As per my understanding in direct Kafka Stream, Drive gives the offsets to
> executors and polls with this.
>
> Kafka Stream
>
>         HashMap<String, Object> kafkaParams = new HashMap<>();
>         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<hosts>");
>         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"testing-nfr-7");
>         kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
>         kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
>         kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
>         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
>
>         KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic),
kafkaParams));
>
> Spark Version: 2.4
>
> Spark Config
>
>         SparkConf conf = new SparkConf().setAppName("StreamingTest");
>         conf.set("spark.shuffle.service.enabled", "true");
>         conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
>         conf.set("spark.streaming.backpressure.enabled", "true");
>         conf.set("spark.streaming.concurrentJobs", "1");
>         conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
>         conf.set("spark.streaming.backpressure.pid.minRate", "1500");
>
>
>
> Regards,
>
> Hrishi
>
>

Mime
View raw message