On Thu, Mar 19, 2020 at 12:31 AM Tathagata Das <tathagata.das1565@gmail.com> wrote:
Why are you not using Structured Streaming? Structured Streaming kafka support directly support multiple topics. 

val df = spark.readStream.format("kafka").option("subscribe", "topic1,topic2").load()

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

On Wed, Mar 18, 2020 at 7:49 AM Hrishikesh Mishra <sd.hrishi@gmail.com> wrote:
This is simplified version of the code.
Here
Number of topics #. 2 
Actions per topics# 2 

Total Jobs = 2 X 2 = 4

And committing offset to Kafka for both topics. 



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;

import java.util.HashMap;


public class Main {

public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("StreamingTest");

conf.set("spark.shuffle.service.enabled", "true");
conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
conf.set("spark.streaming.backpressure.enabled", "true");
conf.set("spark.streaming.concurrentJobs", "1");
conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
conf.set("spark.executor.extraJavaOptions", "-Dlog4j.configuration=file:/tmp/log4j-executor.properties");
conf.set("spark.streaming.backpressure.pid.minRate", "1500");

JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));
ssc.sparkContext().setLogLevel("DEBUG");

/** Kafka Stream 1 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "topic1");

/** Action 1 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {

try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));



/** Action 2 on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 1");
System.out.println(e);
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Commit on Kafka Stream 1 **/
kafkaStream1.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
});

/** Kafka Stream 2 **/
JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream2 = createKafkaStream(ssc, "topic2");

/** Action 1 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 1 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));

/** Action 2 on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
try {
/** Some kind on transformation will be performed here **/
System.out.println("Action 2 -> Stream 2");
Thread.sleep(2);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
})));


/** Commit on Kafka Stream 2 **/
kafkaStream2.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);

((CanCommitOffsets) kafkaStream2.inputDStream()).commitAsync(beginOffsets);
});

ssc.start();
ssc.awaitTermination();
}

public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
HashMap<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<borker-ips>");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "hrishi-testing-nfr-21");
kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 90000);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
}
}




On Wed, Mar 18, 2020 at 4:45 PM Gerard Maas <gerard.maas@gmail.com> wrote:
Hrishi,

Could you share a simplified version of the code you are running?  A job is made out of tasks. 
While jobs are indeed sequential, tasks will be executed in parallel. 
In the Spark UI, you can see that in the "Event Timeline" visualization. 

If you could share an example of your code that illustrates what you want to achieve, I could have a look at it.

kr, Gerard.

On Wed, Mar 18, 2020 at 8:24 AM Hrishikesh Mishra <sd.hrishi@gmail.com> wrote:
HI Gerard,

First of all, apologies for late reply. 

You are right, tasks are distributed to the cluster and parallelism is achieve through Kafka partitions.  But my uses case is different, in one streaming context, I am consuming events from 6 different topics and for each topic  6 different actions are being performed.

So total Spark jobs = 6 streams X 6 actions = 36 jobs (plus some Kafka commits which happen on drivers) for a batch.

These 36 jobs executed sequentially, because at point of time only one job is active (see below image). And delay in job leads to delay in complete batch. 

image.png

But one Job (which is a corresponding a topic and one action), is executed parallel based on number of partition that topic has. 
image.png

What is my requirement: 
  • I want run these jobs in parallel in some controlled manner, like I want to run jobs of different topics in parallal but within a topic job sequentially. We tried with spark.scheduler.mode: FAIR and submitted jobs in different pool but didn't get any benefit. 
But when I tried with spark.streaming.concurrentJobs = 4, then 4 jobs are actively running from different batches ( batch time 19:15:55 and batch time 19:16:00. ), which could be problem with committing offsets. 

image.png


Regards
Hrishi


On Thu, Mar 5, 2020 at 12:49 AM Gerard Maas <gerard.maas@gmail.com> wrote:
Hi Hrishi,

When using the Direct Kafka stream approach, processing tasks will be distributed to the cluster. 
The level of parallelism is dependent on how many partitions the consumed topics have.
Why do you think that the processing is not happening in parallel?

I would advise you to get the base scenario working before looking into advanced features like `concurrentJobs` or a particular scheduler.

kind regards, Gerard.

On Wed, Mar 4, 2020 at 7:42 PM Hrishikesh Mishra <sd.hrishi@gmail.com> wrote:
Hi 

My spark stream job consumes from multiple Kafka topics. How can I process parallely? Should I try for spark.streaming.concurrentJobs, but it has some adverse effects as mentioned by the creator. Is it still valid with Spark 2.4 and Direct Kafka Stream? What about FAIR scheduling mode, will it help in this scenario. I am not getting any valid links around this.

Regards
Hrishi