spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
Date Mon, 01 Dec 2014 13:02:37 GMT
Looks good.

Add these lines in the code if you want to get ride of those log4j
info/warn messages

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)


Thanks
Best Regards

On Mon, Dec 1, 2014 at 6:22 PM, <m.sarosh@accenture.com> wrote:

>  Hi,
>
>
>
> I have now increased the core to 10.
>
>
>
> This time the messages are coming but sometimes warning. Like, 15-17 lines
> I entered:
>
> Hiii
>
> Hiiiiii
>
> Hiiiiiiiii
>
> Jpjpjpjpj
>
> …
>
> ….
>
>
>
> But I got only 4 lines in return, and with some warnings also
>
> The logs are :
>
>
>
> 2014-12-01 08:43:32,154 INFO  [Executor task launch worker-2]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 0.0 in stage
> 10.0 (TID 18)
>
> 2014-12-01 08:43:32,163 INFO  [Executor task launch worker-2]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441408800 locally
>
> NewMessage: hiiii++++++++++++++++++
>
> 2014-12-01 08:43:32,164 INFO  [Executor task launch worker-2]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 0.0 in stage
> 10.0 (TID 18). 593 bytes result sent to driver
>
> 2014-12-01 08:43:32,167 INFO  [Result resolver thread-1]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 0.0 in
> stage 10.0 (TID 18) in 42 ms on localhost (1/3)
>
> 2014-12-01 08:43:32,166 INFO  [Executor task launch worker-3]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 2.0 in stage
> 10.0 (TID 20)
>
> 2014-12-01 08:43:32,166 INFO  [Executor task launch worker-1]
> executor.Executor (Logging.scala:logInfo(59)) - Running task 1.0 in stage
> 10.0 (TID 19)
>
> 2014-12-01 08:43:32,181 INFO  [Executor task launch worker-1]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441409800 locally
>
> NewMessage: hiiii++++++++++++++++++
>
> 2014-12-01 08:43:32,183 INFO  [Executor task launch worker-3]
> storage.BlockManager (Logging.scala:logInfo(59)) - Found block
> input-0-1417441410800 locally
>
> NewMessage: hiii++++++++++++++++++
>
> 2014-12-01 08:43:32,188 INFO  [Executor task launch worker-1]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 1.0 in stage
> 10.0 (TID 19). 593 bytes result sent to driver
>
> 2014-12-01 08:43:32,188 INFO  [Result resolver thread-0]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 1.0 in
> stage 10.0 (TID 19) in 46 ms on localhost (2/3)
>
> 2014-12-01 08:43:32,196 INFO  [Result resolver thread-2]
> scheduler.TaskSetManager (Logging.scala:logInfo(59)) - Finished task 2.0 in
> stage 10.0 (TID 20) in 51 ms on localhost (3/3)
>
> 2014-12-01 08:43:32,197 INFO  [Result resolver thread-2]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 10.0, whose tasks have all completed, from pool
>
> 2014-12-01 08:43:32,197 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Stage 10 (take at DStream.scala:608) finished
> in 0.058 s
>
> 2014-12-01 08:43:32,198 INFO  [pool-7-thread-1] spark.SparkContext
> (Logging.scala:logInfo(59)) - Job finished: take at DStream.scala:608, took
> 0.103179818 s
>
> -------------------------------------------
>
> Time: 1417441412000 ms
>
> -------------------------------------------
>
> hiiii
>
> hiiii
>
> hiiii
>
> hiii
>
>
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417441412000 ms.0
> from job set of time 1417441412000 ms
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.199 s for time 1417441412000
> ms (execution: 0.169 s)
>
> 2014-12-01 08:43:32,199 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 16 from persistence list
>
> 2014-12-01 08:43:32,201 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 16
>
> 2014-12-01 08:43:32,202 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 15 from persistence list
>
> 2014-12-01 08:43:32,203 INFO
> [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 15
>
> 2014-12-01 08:43:32,204 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[15] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417441412000 ms
>
> 2014-12-01 08:43:32,205 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441403800
>
> 2014-12-01 08:43:32,205 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441403800 of size 78
> dropped from memory (free 280228496)
>
> 2014-12-01 08:43:32,206 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441403800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,206 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441403800
>
> 2014-12-01 08:43:32,207 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441404800
>
> 2014-12-01 08:43:32,207 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441404800 of size 78
> dropped from memory (free 280228574)
>
> 2014-12-01 08:43:32,208 INFO
> [sparkDriver-akka.actor.default-dispatcher-13] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441404800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,209 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441404800
>
> 2014-12-01 08:43:32,212 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441405800
>
> 2014-12-01 08:43:32,212 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441405800 of size 78
> dropped from memory (free 280228652)
>
> 2014-12-01 08:43:32,213 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441405800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,213 INFO
> [sparkDriver-akka.actor.default-dispatcher-16] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441405800
>
> 2014-12-01 08:43:32,213 INFO  [Executor task launch worker-3]
> executor.Executor (Logging.scala:logInfo(59)) - Finished task 2.0 in stage
> 10.0 (TID 20). 592 bytes result sent to driver
>
> 2014-12-01 08:43:32,214 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing block input-0-1417441406800
>
> 2014-12-01 08:43:32,214 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441406800 of size 78
> dropped from memory (free 280228730)
>
> 2014-12-01 08:43:32,215 INFO
> [sparkDriver-akka.actor.default-dispatcher-3] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Removed input-0-1417441406800 on
> 192.168.88.130:34367 in memory (size: 78.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:32,215 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441406800
>
> 2014-12-01 08:43:33,002 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20245,
> maxMem=280248975
>
> 2014-12-01 08:43:33,003 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441412800 stored as bytes
> in memory (estimated size 77.0 B, free 267.2 MB)
>
> 2014-12-01 08:43:33,005 INFO
> [sparkDriver-akka.actor.default-dispatcher-14] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input-0-1417441412800 in memory on
> 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:33,016 INFO  [Thread-31] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441412800
>
> 2014-12-01 08:43:33,023 WARN  [handle-message-executor-2]
> storage.BlockManager (Logging.scala:logWarning(71)) - Block
> input-0-1417441412800 already exists on this machine; not re-adding it
>
> 2014-12-01 08:43:33,029 INFO  [Thread-31] receiver.BlockGenerator
> (Logging.scala:logInfo(59)) - Pushed block input-0-1417441412800
>
> 2014-12-01 08:43:34,001 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - ensureFreeSpace(77) called with curMem=20322,
> maxMem=280248975
>
> 2014-12-01 08:43:34,002 INFO  [Thread-31] storage.MemoryStore
> (Logging.scala:logInfo(59)) - Block input-0-1417441413800 stored as bytes
> in memory (estimated size 77.0 B, free 267.2 MB)
>
> 2014-12-01 08:43:34,003 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManagerInfo
> (Logging.scala:logInfo(59)) - Added input-0-1417441413800 in memory on
> 192.168.88.130:34367 (size: 77.0 B, free: 267.3 MB)
>
> 2014-12-01 08:43:34,007 INFO  [Thread-31] storage.BlockManagerMaster
> (Logging.scala:logInfo(59)) - Updated info of block input-0-1417441413800
>
> 2014-12-01 08:43:34,014 WARN  [handle-message-executor-4]
> storage.BlockManager (Logging.scala:logWarning(71)) - Block
> input-0-1417441413800 already exists on this machine; not re-adding it
>
>
>
>
>
> Is there something more missing in configuration ?
>
>
>
> Regargds,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 6:09 PM
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> You would need a multi-core machine (>= 2 cores) for spark-streaming to
> work while running in standalone mode. But it will work fine if you run it
> in local mode with master as local[4].
>
>
>
> What are you getting after making this change
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> and running the code? Are you still getting :
>
>
>
> Initial job has not accepted any resources; check your cluster UI to
> ensure that workers are registered and have sufficient memory
>
>
>
> (Make sure you push some data to kafka producer while the code is running)
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 5:38 PM, <m.sarosh@accenture.com> wrote:
>
> Hi,
>
> I have now configured the Spark…I had CDH5 installation, so referred the
> installation doc.
>
> I have the worker up now:
>
>  Now I tried using:
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> Also,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
> And,
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[*]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>
> Log are still the same:
>
> -------------------------------------------
>
> Time: 1417438988000 ms
>
> -------------------------------------------
>
>
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,008 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417438988000 ms.0
> from job set of time 1417438988000 ms
>
> 2014-12-01 08:03:08,009 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.008 s for time 1417438988000
> ms (execution: 0.000 s)
>
> 2014-12-01 08:03:08,010 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417438988000 ms
>
> 2014-12-01 08:03:08,015 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 39 from persistence list
>
> 2014-12-01 08:03:08,024 INFO
> [sparkDriver-akka.actor.default-dispatcher-4] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 39
>
> 2014-12-01 08:03:08,027 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 38 from persistence list
>
> 2014-12-01 08:03:08,031 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 38
>
> 2014-12-01 08:03:08,033 INFO
> [sparkDriver-akka.actor.default-dispatcher-15] kafka.KafkaInputDStream
> (Logging.scala:logInfo(59)) - Removing blocks of RDD BlockRDD[38] at
> BlockRDD at ReceiverInputDStream.scala:69 of time 1417438988000 ms
>
> 2014-12-01 08:03:09,002 INFO
> [sparkDriver-akka.actor.default-dispatcher-2] scheduler.ReceiverTracker
> (Logging.scala:logInfo(59)) - Stream 0 received 0 blocks
>
>
>
>
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 4:41 PM
>
>
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> I see you have no worker machines to execute the job
>
>
>
> [image: Inline image 1]
>
>
>
> You haven't configured your spark cluster properly.
>
>
>
> Quick fix to get it running would be run it on local mode, for that change
> this line
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("spark://
> 192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>
>
> to this
>
>
>
> JavaStreamingContext jssc = *new* JavaStreamingContext("local[4]",
> "SparkStream", *new* Duration(3000));
>
>
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 4:18 PM, <m.sarosh@accenture.com> wrote:
>
> Hi,
>
>
>
> The spark master is working, and I have given the same url in the code:
>
>
>
> The warning is gone, and the new log is:
>
> -------------------------------------------
>
> Time: 1417427850000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427850000 ms.0
> from job set of time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-2] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.028 s for time 1417427850000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 25 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 25
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 24 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 24
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[24] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427850000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) *- Stream 0
> received 0 blocks*
>
> -------------------------------------------
>
> Time: 1417427853000 ms
>
> -------------------------------------------
>
>
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Starting job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Finished job streaming job 1417427853000 ms.0
> from job set of time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Total delay: 0.015 s for time 1417427853000
> ms (execution: 0.001 s)
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] scheduler.JobScheduler
> (Logging.scala:logInfo(59)) - Added jobs for time 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.MappedRDD
> (Logging.scala:logInfo(59)) - Removing RDD 27 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-5] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 27
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4] rdd.BlockRDD
> (Logging.scala:logInfo(59)) - Removing RDD 26 from persistence list
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6] storage.BlockManager
> (Logging.scala:logInfo(59)) - Removing RDD 26
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-4]
> kafka.KafkaInputDStream (Logging.scala:logInfo(59)) - Removing blocks of
> RDD BlockRDD[26] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417427853000 ms
>
> INFO  [sparkDriver-akka.actor.default-dispatcher-6]
> scheduler.ReceiverTracker (Logging.scala:logInfo(59)) - *Stream 0
> received 0 blocks*
>
>
>
> What should be my approach now ?
>
> Need urgent help.
>
>
>
> Regards,
>
> Aiman
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Monday, December 01, 2014 3:56 PM
> *To:* Sarosh, M.
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka+Spark-streaming issue: Stream 0 received 0 blocks
>
>
>
> It says:
>
>
>
>  14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has not
> accepted any resources; check your cluster UI to ensure that workers are
> registered and have sufficient memory
>
>
>
> A quick guess would be, you are giving the wrong master url. ( spark://
> 192.168.88.130:7077 ) Open the webUI running on port 8080 and use the
> master url listed there on top left corner of the page.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Dec 1, 2014 at 3:42 PM, <m.sarosh@accenture.com> wrote:
>
> Hi,
>
>
>
> I am integrating Kafka and Spark, using spark-streaming. I have created a
> topic as a kafka producer:
>
>
>
>     bin/kafka-topics.sh --create --zookeeper localhost:2181
> --replication-factor 1 --partitions 1 --topic test
>
>
>
>
>
> I am publishing messages in kafka and trying to read them using
> spark-streaming java code and displaying them on screen.
>
> The daemons are all up: Spark-master,worker; zookeeper; kafka.
>
> I am writing a java code for doing it, using KafkaUtils.createStream
>
> code is below:
>
>
>
> *package* *com.spark*;
>
>
>
> *import* scala.Tuple2;
>
> *import* *kafka*.serializer.Decoder;
>
> *import* *kafka*.serializer.Encoder;
>
> *import* org.apache.spark.streaming.Duration;
>
> *import* org.apache.spark.*;
>
> *import* org.apache.spark.api.java.function.*;
>
> *import* org.apache.spark.api.java.*;
>
> *import* *org.apache.spark.streaming.kafka*.KafkaUtils;
>
> *import* *org.apache.spark.streaming.kafka*.*;
>
> *import* org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> *import* org.apache.spark.streaming.api.java.JavaPairDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaDStream;
>
> *import* org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
>
> *import* java.util.Map;
>
> *import* java.util.HashMap;
>
>
>
> *public* *class* *SparkStream* {
>
>        *public* *static* *void* main(String args[])
>
>        {
>
>               *if*(args.length != 3)
>
>               {
>
>                      System.*out*.println("Usage: spark-submit –class
> com.spark.SparkStream target/SparkStream-with-dependencies.jar
> <zookeeper_ip> <group_name> <topic1,topic2,...>");
>
>                      System.*exit*(1);
>
>               }
>
>
>
>
>
>               Map<String,Integer> topicMap = *new*
> HashMap<String,Integer>();
>
>               String[] topic = args[2].split(",");
>
>               *for*(String t: topic)
>
>               {
>
>                      topicMap.put(t, *new* Integer(1));
>
>               }
>
>
>
>               JavaStreamingContext jssc = *new* JavaStreamingContext(
> "spark://192.168.88.130:7077", "SparkStream", *new* Duration(3000));
>
>               JavaPairReceiverInputDStream<String, String> messages =
> *KafkaUtils*.createStream(jssc, args[0], args[1], topicMap );
>
>
>
>               System.*out*.println("Connection done");
>
>               JavaDStream<String> data = messages.map(*new* *Function<Tuple2<String,
> String>, String>()*
>
>                                                 {
>
>                                                        *public* String
> call(Tuple2<String, String> message)
>
>                                                        {
>
>                                                               System.*out*
> .println("NewMessage: "+message._2()); //for debugging
>
>                                                               *return*
> message._2();
>
>                                                        }
>
>                                                 });
>
>
>
> data.print();
>
>
>
>               jssc.start();
>
>               jssc.awaitTermination();
>
>
>
>        }
>
> }
>
>
>
>
>
> I am running the job, and at other terminal I am running kafka-producer to
> publish messages:
>
> #bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>
> >    Hi kafka
>
> >    second message
>
> >    another message
>
>
>
> But the output logs at the spark-streaming console doesn't show the
> messages, but shows zero blocks received:
>
>
>
>
>
>     -------------------------------------------
>
>     Time: 1417107363000 ms
>
>     -------------------------------------------
>
>
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Starting job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Finished job streaming
> job 1417107363000 ms.0 from job set of time 1417107363000 ms
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Total delay: 0.008 s
> for time 1417107363000 ms (execution: 0.000 s)
>
>     14/11/27 11:56:03 INFO scheduler.JobScheduler: Added jobs for time
> 1417107363000 ms
>
>     14/11/27 11:56:03 INFO rdd.BlockRDD: Removing RDD 13 from persistence
> list
>
>     14/11/27 11:56:03 INFO storage.BlockManager: Removing RDD 13
>
>     14/11/27 11:56:03 INFO kafka.KafkaInputDStream: Removing blocks of RDD
> BlockRDD[13] at BlockRDD at ReceiverInputDStream.scala:69 of time
> 1417107363000 ms
>
>     14/11/27 11:56:05 WARN scheduler.TaskSchedulerImpl: Initial job has
> not accepted any resources; check your cluster UI to ensure that workers
> are registered and have sufficient memory
>
>     14/11/27 11:56:06 INFO scheduler.ReceiverTracker*: Stream 0 received
> 0 blocks*
>
>
>
>
>
> Why isn't the data block getting received? i have tried using kafka
> producer-consumer on console bin/kafka-console-producer....  and
> bin/kafka-console-consumer...  its working perfect, but why not the code
> above? Please help me.
>
>
>
>
>
> Regards,
>
> Aiman Sarosh
>
>
>
>
>  ------------------------------
>
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Where allowed
> by local law, electronic communications with Accenture and its affiliates,
> including e-mail and instant messaging (including content), may be scanned
> by our systems for the purposes of information security and assessment of
> internal compliance with Accenture policy.
>
> ______________________________________________________________________________________
>
> www.accenture.com
>
>
>
>
>
>
>

Mime
View raw message