spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gerard Maas <gerard.m...@gmail.com>
Subject Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Date Mon, 01 Dec 2014 12:24:40 GMT
You have only 1 core available - Spark Streaming will be able to consume
messages but not process them as there're no additional resources to
process the RDD.
You need to further tune your configuration to add more cores at the
executor. Have a look at the configuration options in the docs:
http://spark.apache.org/docs/latest/spark-standalone.html

-kr, Gerard.

On Mon, Dec 1, 2014 at 1:08 PM, <m.sarosh@accenture.com> wrote:

>  Hi,
>
> I have now configured the Spark…I had CDH5 installation, so referred the
> installation doc.
>
> I have the worker up now:
>
>  Now I tried using:
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> Also,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> And,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[*]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>
> Log are still the same:
>
> -------------------------------------------
>
> Time: 1417438988000 ms
>
> -------------------------------------------
>
>
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,009 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000
> ms (execution: 0.000 s)
>
> 2014-12-01 08:03:08,010 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
>
> 2014-12-01 08:03:08,015 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
>
> 2014-12-01 08:03:08,024 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 39
>
> 2014-12-01 08:03:08,027 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
>
> 2014-12-01 08:03:08,031 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 38
>
> 2014-12-01 08:03:08,033 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
>
> 2014-12-01 08:03:09,002 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker
> (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
>
>
>
>
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 4:41 PM
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> I see you have no worker machines to execute the job
>
>
>
> [image: Inline image 1]
>
>
>
> You haven't configured your spark cluster properly.
>
>
>
> Quick fix to get it running would be run it on local mode, for that change
> this line
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> to this
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 4:18 PM, <m.sarosh@accenture.com> wrote:
>
> Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m.sarosh@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>
>
>

Mime
View raw message