spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacek Laskowski <ja...@japila.pl>
Subject Re: Kafka 0.10 & Spark Streaming 2.0.2
Date Fri, 02 Dec 2016 17:21:45 GMT
Hi,

Can you post the screenshot of the Executors and Streaming tabs?

Jacek

On 2 Dec 2016 5:54 p.m., "Gabriel Perez" <gabriel@adtheorent.com> wrote:

> Hi,
>
>
>
> The total partitions are 128 and I can tell its one executor because in
> the consumer list for kafka I see only one thread pulling and in the master
> spark UI I see the executor thread id is showing as 0 and that’s it.
>
>
>
> Thanks,
>
> Gabe
>
>
>
>
>
> *From: *Jacek Laskowski <jacek@japila.pl>
> *Date: *Friday, December 2, 2016 at 11:47 AM
> *To: *Gabriel Perez <gabriel@adtheorent.com>
> *Cc: *user <user@spark.apache.org>
> *Subject: *Re: Kafka 0.10 & Spark Streaming 2.0.2
>
>
>
> Hi,
>
>
>
> How many partitions does the topic have? How do you check how many
> executors read from the topic?
>
>
>
> Jacek
>
>
>
>
>
> On 2 Dec 2016 2:44 p.m., "gabrielperez2484" <gabriel@adtheorent.com>
> wrote:
>
> Hello,
>
> I am trying to perform a POC between Kafka 0.10 and Spark 2.0.2. Currently
> I
> am running into an issue, where only one executor ("kafka consumer") is
> reading from the topic. Which is causing performance to be really poor. I
> have tried adding "--num-executors 8" both in the script to execute the jar
> and in my java code. Here is the code below. Please let me know if I am
> missing something or there is a way to increase the number of consumers to
> connect to kafka.
>
>
> Thanks,
> Gabe
>
> <code>
> Map<String, Object> kafkaParams = new HashMap<>();
>                 kafkaParams.put( "bootstrap.servers", "server:9092" );
>                 kafkaParams.put( "key.deserializer",
> StringDeserializer.class );
>                 kafkaParams.put( "value.deserializer",
> StringDeserializer.class );
>                 kafkaParams.put( "group.id", "spark-aggregation" );
>                 kafkaParams.put( "auto.offset.reset", "earliest" );
>                 kafkaParams.put( "request.timeout.ms", "305000" );
>                 kafkaParams.put( "heartbeat.interval.ms", "85000" );
>                 kafkaParams.put( "session.timeout.ms", "90000" );
>
>                 Collection<String> topics = Arrays.asList( "Topic" );
>
>                 SparkConf sparkConf = new SparkConf().setMaster(
> "spark://server:7077" )
>                                 .setAppName( "aggregation" ).set(
> "spark.submit.deployMode", "cluster" )
>                                 .set( "spark.executor.instances", "16" );
>
>                 JavaStreamingContext javaStreamingContext = new
> JavaStreamingContext(
> sparkConf, new Duration( 5000 ) );
>
>                 //Creates connect to the Stream.....
>                 final JavaInputDStream<ConsumerRecord&lt;String, String>>
> stream =
> KafkaUtils.createDirectStream(
>                                 javaStreamingContext, LocationStrategies.
> PreferConsistent(),
>                                 ConsumerStrategies.<String, String>
> Subscribe( topics, kafkaParams ) );
>
>                 //JavaPairDStream<String, String> unifiedStream =
> javaStreamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,
> kafkaStreams.size()));
>
>                 JavaDStream<String> records = stream.map( new
> Function<ConsumerRecord&lt;String, String>, String>() {
>
>                         private static final long serialVersionUID = 1L;
>
>                         @Override
>                         /**
>                          * Pulling key from the stream and creating the
> aggregation key.
>                          */
>                         public String call( ConsumerRecord<String, String>
> record ) {
>
>
>                                 return record.key();
>
>                         }
>                 } );
>
>                 JavaPairDStream<String, Integer> pairs =
> records.mapToPair( new
> PairFunction<String, String, Integer>() {
>
>                         private static final long serialVersionUID = 1L;
>
>                         @Override
>                         /**
>                          * Creating new tuple to perform calculations on.
>                          */
>                         public Tuple2<String, Integer> call( String s ) {
>
>                                 return new Tuple2<>( s, 1 );
>                         }
>                 } );
>
>                 JavaPairDStream<String, Integer> counts =
> pairs.reduceByKey( new
> Function2<Integer, Integer, Integer>() {
>
>                         private static final long serialVersionUID = 1L;
>
>                         @Override
>                         /**
>                          * perform counts...
>                          */
>                         public Integer call( Integer i1, Integer i2 ) {
>
>                                 return i1 + i2;
>                         }
>                 } );
>
>                 stream.foreachRDD( new VoidFunction<JavaRDD&lt;
> ConsumerRecord&lt;String,
> String>>>() {
>
>                         /**
>                         *
>                         */
>                         private static final long serialVersionUID = 1L;
>
>                         @Override
>                         public void call( JavaRDD<ConsumerRecord&lt;String,
> String>> rdd ) {
>
>                                 OffsetRange[] offsetRanges = (
> (HasOffsetRanges) rdd.rdd()
> ).offsetRanges();
>
>                                 // some time later, after outputs have
> completed
>                                 ( (CanCommitOffsets) stream.inputDStream()
> ).commitAsync( offsetRanges
> );
>                         }
>                 } );
> </code>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Kafka-0-10-Spark-Streaming-2-0-2-tp28153.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
>

Mime
View raw message