spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Date Mon, 01 Dec 2014 10:25:43 GMT
It says:

 14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient memory

A quick guess would be, you are giving the wrong master url. ( spark://
192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
master url listed there on top left corner of the page.

Thanks
Best Regards

On Mon, Dec 1, 2014 at 3:42 PM, <m.sarosh@accenture.com> wrote:

>  Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
> ------------------------------
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>

Mime
View raw message