spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhang Victor <zhangshuai_w...@outlook.com>
Subject 回复: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka
Date Thu, 19 Mar 2020 06:57:20 GMT
Actually, I am also trying the CurrrentJobs feature in Spark Streaming.

We have a business scenario where we need to write data from kafka topics to a hive tables.

One topic corresponds to one table.

TopicA -> TableA
TopicB -> TableB
...

But there are already dozens of topics.

Currently, I have started multiple spark apps to process, each app corresponds to a topic
-> table.

I tried using a big kafka stream (N topics).

However, the data in each topic needs to be filtered out and then write to correspond table.
The performance is very low.

Can you give me some advice?

Thanks

________________________________
发件人: Tathagata Das <tathagata.das1565@gmail.com>
发送时间: 2020年3月19日 3:01
收件人: Hrishikesh Mishra <sd.hrishi@gmail.com>
抄送: Gerard Maas <gerard.maas@gmail.com>; spark users <user@spark.apache.org>
主题: Re: What is the best way to consume parallely from multiple topics in Spark Stream
with Kafka

Why are you not using Structured Streaming? Structured Streaming kafka support directly support
multiple topics.

val df = spark.readStream.format("kafka").option("subscribe", "topic1,topic2").load()

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <sd.hrishi@gmail.com<mailto:sd.hrishi@gmail.com>>
wrote:
This is simplified version of the code.
Here
Number of topics #. 2
Actions per topics# 2

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics.




import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");

        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
        ssc.sparkContext().setLogLevel("DEBUG");

        /** Kafka Stream 1 **/
        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc,
"topic1");

        /** Action 1 on Kafka Stream  1 **/
        kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p -> p.forEachRemaining(e
-> {

            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 1 -> Stream 1");
                System.out.println(e);
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));



        /** Action 2 on Kafka Stream  1 **/
         kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
            try {
                 /** Some kind on transformation will be performed here **/
                System.out.println("Action 2 -> Stream 1");
                System.out.println(e);
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

         /** Commit  on Kafka Stream  1 **/
        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
        });

        /** Kafka Stream 2 **/
        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc,
"topic2");

        /** Action 1 on Kafka Stream 2 **/
        kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 1 -> Stream 2");
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        /** Action 2 on Kafka Stream 2 **/
         kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 2 -> Stream 2");
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));


         /** Commit  on Kafka Stream  2 **/
        kafkaStream2.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(),
0, o.fromOffset())).toArray(OffsetRange[]::new);

            ((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
        });

        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext
ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic),
kafkaParams));
    }
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <gerard.maas@gmail.com<mailto:gerard.maas@gmail.com>>
wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks.
While jobs are indeed sequential, tasks will be executed in parallel.
In the Spark UI, you can see that in the "Event Timeline" visualization.

If you could share an example of your code that illustrates what you want to achieve, I could
have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <sd.hrishi@gmail.com<mailto:sd.hrishi@gmail.com>>
wrote:
HI Gerard,

First of all, apologies for late reply.

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka
partitions.  But my uses case is different, in one streaming context, I am consuming events
from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen
on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see
below image). And delay in job leads to delay in complete batch.

[image.png]

But one Job (which is a corresponding a topic and one action), is executed parallel based
on number of partition that topic has.
[image.png]

What is my requirement:

  *   I want run these jobs in parallel in some controlled manner, like I want to run jobs
of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode:
FAIR and submitted jobs in different pool but didn't get any benefit.

But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running
from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem
with committing offsets.

[image.png]


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <gerard.maas@gmail.com<mailto:gerard.maas@gmail.com>>
wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster.
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features
like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <sd.hrishi@gmail.com<mailto:sd.hrishi@gmail.com>>
wrote:
Hi

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should
I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by
the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling
mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi

Mime
View raw message