spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh Mishra <sd.hri...@gmail.com>
Subject Re: What is the best way to consume parallely from multiple topics in Spark Stream with Kafka
Date Thu, 19 Mar 2020 06:42:30 GMT
On Thu, Mar 19, 2020 at 12:31 AM Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> Why are you not using Structured Streaming? Structured Streaming kafka
> support directly support multiple topics.
>
> val df = spark.readStream.format("kafka").option("subscribe",
> "topic1,topic2").load()
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <sd.hrishi@gmail.com>
> wrote:
>
>> This is simplified version of the code.
>> Here
>> Number of topics #. 2
>> Actions per topics# 2
>>
>> Total Jobs = 2 X 2 = 4
>>
>> And committing offset to Kafka for both topics.
>>
>>
>>
>> import com.google.common.collect.ImmutableList;
>> import org.apache.kafka.clients.consumer.ConsumerConfig;
>> import org.apache.kafka.clients.consumer.ConsumerRecord;
>> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
>> import org.apache.kafka.common.serialization.StringDeserializer;
>> import org.apache.spark.SparkConf;
>> import org.apache.spark.TaskContext;
>> import org.apache.spark.streaming.Durations;
>> import org.apache.spark.streaming.api.java.JavaInputDStream;
>> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>> import org.apache.spark.streaming.kafka010.*;
>>
>> import java.util.Arrays;
>>
>> import java.util.HashMap;
>>
>>
>> public class Main {
>>
>>     public static void main(String[] args) throws InterruptedException {
>>         SparkConf conf = new SparkConf().setAppName("StreamingTest");
>>
>>         conf.set("spark.shuffle.service.enabled", "true");
>>         conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
>>         conf.set("spark.streaming.backpressure.enabled", "true");
>>         conf.set("spark.streaming.concurrentJobs", "1");
>>         conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
>>         conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
>>         conf.set("spark.streaming.backpressure.pid.minRate", "1500");
>>
>>         JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
>>         ssc.sparkContext().setLogLevel("DEBUG");
>>
>>         /** Kafka Stream 1 **/
>>         JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1
= createKafkaStream(ssc, "topic1");
>>
>>         /** Action 1 on Kafka Stream  1 **/
>>         kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
>>
>>             try {
>>                 /** Some kind on transformation will be performed here **/
>>                 System.out.println("Action 1 -> Stream 1");
>>                 System.out.println(e);
>>                 Thread.sleep(2);
>>             } catch (InterruptedException ex) {
>>                 ex.printStackTrace();
>>             }
>>         })));
>>
>>
>>
>>         /** Action 2 on Kafka Stream  1 **/
>>          kafkaStream1.foreachRDD(rdd ->  rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
>>             try {
>>                  /** Some kind on transformation will be performed here **/
>>                 System.out.println("Action 2 -> Stream 1");
>>                 System.out.println(e);
>>                 Thread.sleep(2);
>>             } catch (InterruptedException ex) {
>>                 ex.printStackTrace();
>>             }
>>         })));
>>
>>          /** Commit  on Kafka Stream  1 **/
>>         kafkaStream1.foreachRDD(rdd -> {
>>             OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>>             ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
>>         });
>>
>>         /** Kafka Stream 2 **/
>>         JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2
= createKafkaStream(ssc, "topic2");
>>
>>         /** Action 1 on Kafka Stream 2 **/
>>         kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
>>             try {
>>                 /** Some kind on transformation will be performed here **/
>>                 System.out.println("Action 1 -> Stream 2");
>>                 Thread.sleep(2);
>>             } catch (InterruptedException ex) {
>>                 ex.printStackTrace();
>>             }
>>         })));
>>
>>         /** Action 2 on Kafka Stream 2 **/
>>          kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e
-> {
>>             try {
>>                 /** Some kind on transformation will be performed here **/
>>                 System.out.println("Action 2 -> Stream 2");
>>                 Thread.sleep(2);
>>             } catch (InterruptedException ex) {
>>                 ex.printStackTrace();
>>             }
>>         })));
>>
>>
>>          /** Commit  on Kafka Stream  2 **/
>>         kafkaStream2.foreachRDD(rdd -> {
>>             OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>>             final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o
-> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
>>
>>             ((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
>>         });
>>
>>         ssc.start();
>>         ssc.awaitTermination();
>>     }
>>
>>     public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext
ssc, String topic) {
>>         HashMap<String, Object> kafkaParams = new HashMap<>();
>>         kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
>>         kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
>>         kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
>>         kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
>>         kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
>>         kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
>>         kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
>>         kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>         kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
>>         kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
>>         kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
>>
>>         return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
>>     }
>> }
>>
>>
>>
>>
>>
>> On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <gerard.maas@gmail.com>
>> wrote:
>>
>>> Hrishi,
>>>
>>> Could you share a simplified version of the code you are running?  A job
>>> is made out of tasks.
>>> While jobs are indeed sequential, tasks will be executed in parallel.
>>> In the Spark UI, you can see that in the "Event Timeline" visualization.
>>>
>>> If you could share an example of your code that illustrates what you
>>> want to achieve, I could have a look at it.
>>>
>>> kr, Gerard.
>>>
>>> On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <sd.hrishi@gmail.com>
>>> wrote:
>>>
>>>> HI Gerard,
>>>>
>>>> First of all, apologies for late reply.
>>>>
>>>> You are right, tasks are distributed to the cluster and parallelism is
>>>> achieve through Kafka partitions.  But my uses case is different, in one
>>>> streaming context, I am consuming events from 6 different topics and for
>>>> each topic  6 different actions are being performed.
>>>>
>>>> So total *Spark jobs = 6 streams X 6 actions = 36 jobs *(plus some
>>>> Kafka commits which happen on drivers) for *a batch.*
>>>>
>>>> These 36 jobs executed sequentially, because at point of time only one
>>>> job is active (see below image). And delay in job leads to delay in
>>>> complete batch.
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> But one Job (which is a corresponding a topic and one action), is
>>>> executed parallel based on number of partition that topic has.
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> What is my requirement:
>>>>
>>>>    - I want run these jobs in parallel in some controlled manner, like
>>>>    I want to run jobs of different topics in parallal but within a topic
job
>>>>    sequentially. We tried with* spark.scheduler.mode: FAIR* and
>>>>    submitted jobs in different pool but didn't get any benefit.
>>>>
>>>> But when I tried with *spark.streaming.concurrentJobs = 4, *then 4
>>>> jobs are actively running from different batches ( batch time 19:15:55
>>>> and batch time 19:16:00. ), which could be problem with committing
>>>> offsets.
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> Regards
>>>> Hrishi
>>>>
>>>>
>>>> On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <gerard.maas@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Hrishi,
>>>>>
>>>>> When using the Direct Kafka stream approach, processing tasks will be
>>>>> distributed to the cluster.
>>>>> The level of parallelism is dependent on how many partitions the
>>>>> consumed topics have.
>>>>> Why do you think that the processing is not happening in parallel?
>>>>>
>>>>> I would advise you to get the base scenario working before looking
>>>>> into advanced features like `concurrentJobs` or a particular scheduler.
>>>>>
>>>>> kind regards, Gerard.
>>>>>
>>>>> On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <sd.hrishi@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> My spark stream job consumes from multiple Kafka topics. How can
I
>>>>>> process parallely? Should I try for *spark.streaming.concurrentJobs,*
but
>>>>>> it has some adverse effects as mentioned by the creator. Is it still
valid
>>>>>> with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling
mode,
>>>>>> will it help in this scenario. I am not getting any valid links around
this.
>>>>>>
>>>>>> Regards
>>>>>> Hrishi
>>>>>>
>>>>>>

Mime
View raw message