spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mukul Gupta <mukul.gu...@aricent.com>
Subject Re: Kafka + Spark streaming, RDD partitions not processed in parallel
Date Sun, 13 Mar 2016 12:03:35 GMT
Sorry for the late reply. I am new to Java and it took me a while to set things up.

Yes, you are correct that kafka client libs need not be specifically added. I didn't realized
that . I removed the same and code still compiled. However, upon execution, I encountered
the same issue as before.

Following is the link to repository:
https://github.com/guptamukul/sparktest.git

________________________________________
From: Cody Koeninger <cody@koeninger.org>
Sent: 11 March 2016 23:04
To: Mukul Gupta
Cc: user@spark.apache.org
Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel

Why are you including a specific dependency on Kafka?  Spark's
external streaming kafka module already depends on kafka.

Can you link to an actual repo with build file etc?

On Fri, Mar 11, 2016 at 11:21 AM, Mukul Gupta <mukul.gupta@aricent.com> wrote:
> Please note that while building jar of code below, i used spark 1.6.0 + kafka 0.9.0.0
libraries
> I also tried spark 1.5.0 + kafka 0.9.0.1 combination, but encountered the same issue.
>
> I could not use the ideal combination spark 1.6.0 + kafka 0.9.0.1 (which matches with
spark and kafka versions installed on my machine) because while doing so, i get the following
error at run time:
>     Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint
cannot be cast to kafka.cluster.Broker
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> public class SparkTest {
>
> public static void main(String[] args) {
>
> if (args.length < 5) {
> System.err.println("Usage: SparkTest <kafkabroker> <sparkmaster> <topics>
<consumerGroup> <Duration>");
> System.exit(1);
> }
>
> String kafkaBroker = args[0];
> String sparkMaster = args[1];
> String topics = args[2];
> String consumerGroupID = args[3];
> String durationSec = args[4];
>
> int duration = 0;
>
> try {
> duration = Integer.parseInt(durationSec);
> } catch (Exception e) {
> System.err.println("Illegal duration");
> System.exit(1);
> }
>
> HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
>
> SparkConf  conf = new SparkConf().setMaster(sparkMaster).setAppName("DirectStreamDemo");
>
> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(duration));
>
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("metadata.broker.list", kafkaBroker);
> kafkaParams.put("group.id", consumerGroupID);
>
> JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc,
String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
>
> JavaDStream<String> processed = messages.map(new Function<Tuple2<String,
String>, String>() {
>
> @Override
> public String call(Tuple2<String, String> arg0) throws Exception {
>
> Thread.sleep(7000);
> return arg0._2;
> }
> });
>
> processed.print(90);
>
> try {
> jssc.start();
> jssc.awaitTermination();
> } catch (Exception e) {
>
> } finally {
> jssc.close();
> }
> }
> }
>
>
> ________________________________________
> From: Cody Koeninger <cody@koeninger.org>
> Sent: 11 March 2016 20:42
> To: Mukul Gupta
> Cc: user@spark.apache.org
> Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel
>
> Can you post your actual code?
>
> On Thu, Mar 10, 2016 at 9:55 PM, Mukul Gupta <mukul.gupta@aricent.com> wrote:
>> Hi All, I was running the following test: Setup 9 VM runing spark workers
>> with 1 spark executor each. 1 VM running kafka and spark master. Spark
>> version is 1.6.0 Kafka version is 0.9.0.1 Spark is using its own resource
>> manager and is not running over YARN. Test I created a kafka topic with 3
>> partition. next I used "KafkaUtils.createDirectStream" to get a DStream.
>> JavaPairInputDStream<String, String> stream =
>> KafkaUtils.createDirectStream(…); JavaDStream stream1 = stream.map(func1);
>> stream1.print(); where func1 just contains a sleep followed by returning of
>> value. Observation First RDD partition corresponding to partition 1 of kafka
>> was processed on one of the spark executor. Once processing is finished,
>> then RDD partitions corresponding to remaining two kafka partitions were
>> processed in parallel on different spark executors. I expected that all
>> three RDD partitions should have been processed in parallel as there were
>> spark executors available which were lying idle. I re-ran the test after
>> increasing the partitions of kafka topic to 5. This time also RDD partition
>> corresponding to partition 1 of kafka was processed on one of the spark
>> executor. Once processing is finished for this RDD partition, then RDD
>> partitions corresponding to remaining four kafka partitions were processed
>> in parallel on different spark executors. I am not clear about why spark is
>> waiting for operations on first RDD partition to finish, while it could
>> process remaining partitions in parallel? Am I missing any configuration?
>> Any help is appreciated. Thanks, Mukul
>> ________________________________
>> View this message in context: Kafka + Spark streaming, RDD partitions not
>> processed in parallel
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use
of the individual to whom it is addressed. It may contain privileged or confidential information
and should not be circulated or used for any purpose other than for what it is intended. If
you have received this message in error, please notify the originator immediately. If you
are not the intended recipient, you are notified that you are strictly prohibited from using,
copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility
for loss or damage arising from the use of the information transmitted by this email including
damage from virus."
"DISCLAIMER: This message is proprietary to Aricent and is intended solely for the use of
the individual to whom it is addressed. It may contain privileged or confidential information
and should not be circulated or used for any purpose other than for what it is intended. If
you have received this message in error, please notify the originator immediately. If you
are not the intended recipient, you are notified that you are strictly prohibited from using,
copying, altering, or disclosing the contents of this message. Aricent accepts no responsibility
for loss or damage arising from the use of the information transmitted by this email including
damage from virus."

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message