spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh Mishra <sd.hri...@gmail.com>
Subject Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka
Date Wed, 18 Mar 2020 11:45:39 GMT
This is simplified version of the code.
Here
Number of topics #. 2
Actions per topics# 2

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics.



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.executor.extraJavaOptions",
"-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");

        JavaStreamingContext ssc = new JavaStreamingContext(conf,
Durations.seconds(5));
        ssc.sparkContext().setLogLevel("DEBUG");

        /** Kafka Stream 1 **/
        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1
= createKafkaStream(ssc, "topic1");

        /** Action 1 on Kafka Stream  1 **/
        kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p ->
p.forEachRemaining(e -> {

            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 1 -> Stream 1");
                System.out.println(e);
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));



        /** Action 2 on Kafka Stream  1 **/
         kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p ->
p.forEachRemaining(e -> {
            try {
                 /** Some kind on transformation will be performed here **/
                System.out.println("Action 2 -> Stream 1");
                System.out.println(e);
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

         /** Commit  on Kafka Stream  1 **/
        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
            ((CanCommitOffsets)
kafkaStream1.inputDStream()).commitAsync(offsetRanges);
        });

        /** Kafka Stream 2 **/
        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2
= createKafkaStream(ssc, "topic2");

        /** Action 1 on Kafka Stream 2 **/
        kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p ->
p.forEachRemaining(e -> {
            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 1 -> Stream 2");
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        /** Action 2 on Kafka Stream 2 **/
         kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p ->
p.forEachRemaining(e -> {
            try {
                /** Some kind on transformation will be performed here **/
                System.out.println("Action 2 -> Stream 2");
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));


         /** Commit  on Kafka Stream  2 **/
        kafkaStream2.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets =
Arrays.stream(offsetRanges).map(o ->
OffsetRange.create(o.topicPartition(), 0,
o.fromOffset())).toArray(OffsetRange[]::new);

            ((CanCommitOffsets)
kafkaStream2.inputDStream()).commitAsync(beginOffsets);
        });

        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>>
createKafkaStream(JavaStreamingContext ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"<borker-ips>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic +
"hrishi-testing-nfr-21");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
    }
}





On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <gerard.maas@gmail.com> wrote:

> Hrishi,
>
> Could you share a simplified version of the code you are running?  A job
> is made out of tasks.
> While jobs are indeed sequential, tasks will be executed in parallel.
> In the Spark UI, you can see that in the "Event Timeline" visualization.
>
> If you could share an example of your code that illustrates what you want
> to achieve, I could have a look at it.
>
> kr, Gerard.
>
> On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <sd.hrishi@gmail.com>
> wrote:
>
>> HI Gerard,
>>
>> First of all, apologies for late reply.
>>
>> You are right, tasks are distributed to the cluster and parallelism is
>> achieve through Kafka partitions.  But my uses case is different, in one
>> streaming context, I am consuming events from 6 different topics and for
>> each topic  6 different actions are being performed.
>>
>> So total *Spark jobs = 6 streams X 6 actions = 36 jobs *(plus some Kafka
>> commits which happen on drivers) for *a batch.*
>>
>> These 36 jobs executed sequentially, because at point of time only one
>> job is active (see below image). And delay in job leads to delay in
>> complete batch.
>>
>> [image: image.png]
>>
>>
>> But one Job (which is a corresponding a topic and one action), is
>> executed parallel based on number of partition that topic has.
>>
>> [image: image.png]
>>
>>
>> What is my requirement:
>>
>>    - I want run these jobs in parallel in some controlled manner, like I
>>    want to run jobs of different topics in parallal but within a topic job
>>    sequentially. We tried with* spark.scheduler.mode: FAIR* and
>>    submitted jobs in different pool but didn't get any benefit.
>>
>> But when I tried with *spark.streaming.concurrentJobs = 4, *then 4 jobs
>> are actively running from different batches ( batch time 19:15:55 and batch
>> time 19:16:00. ), which could be problem with committing offsets.
>>
>> [image: image.png]
>>
>>
>> Regards
>> Hrishi
>>
>>
>> On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <gerard.maas@gmail.com>
>> wrote:
>>
>>> Hi Hrishi,
>>>
>>> When using the Direct Kafka stream approach, processing tasks will be
>>> distributed to the cluster.
>>> The level of parallelism is dependent on how many partitions the
>>> consumed topics have.
>>> Why do you think that the processing is not happening in parallel?
>>>
>>> I would advise you to get the base scenario working before looking into
>>> advanced features like `concurrentJobs` or a particular scheduler.
>>>
>>> kind regards, Gerard.
>>>
>>> On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <sd.hrishi@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> My spark stream job consumes from multiple Kafka topics. How can I
>>>> process parallely? Should I try for *spark.streaming.concurrentJobs,* but
>>>> it has some adverse effects as mentioned by the creator. Is it still valid
>>>> with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode,
>>>> will it help in this scenario. I am not getting any valid links around this.
>>>>
>>>> Regards
>>>> Hrishi
>>>>
>>>>

Mime
View raw message