spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Subject Re: Spark streaming not processing messages from partitioned topics
Date Wed, 10 Aug 2016 14:40:16 GMT
Hi Cody,

Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
window. Other batches are queued. I could see foreach(println) of dataFrame
printing one of partition's data and not the other.
Couldn't see any  errors from log.

val brokers = "localhost:9092,localhost:9093"
val sparkConf = new
SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
val kafkaParams = Map[String,
String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
group.id"->"xyz")
val topics = "test"
val topicsSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext.implicits._
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
    println("Failed to get data from Kafka. Please check that the Kafka
producer is streaming data.")
    System.exit(-1)
  }

   val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
  dataframe.foreach(println)
 println( "$$$$$$$$$$$", dataframe.count())
              })
Logs:

16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves
to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
interface eth1)
16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(cloudera);
users with modify permissions: Set(cloudera)
16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
port 45031.
16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
16/08/10 18:16:26 INFO Remoting: Starting remoting
16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
16/08/10 18:16:26 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 56638.
16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
MB
16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
http://192.168.126.131:4040
16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
server' on port 59491.
16/08/10 18:16:28 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
with timestamp 1470833188094
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
timestamp 1470833189531
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/Downloads/boa/pain.jar at
http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
localhost
16/08/10 18:16:29 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:55361 with 511.5 MB RAM, BlockManagerId(driver,
localhost, 55361)
16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
16/08/10 18:16:31 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/08/10 18:16:31 INFO DirectKafkaInputDStream: Slide time = 1000 ms
16/08/10 18:16:31 INFO DirectKafkaInputDStream: Storage level =
StorageLevel(false, false, false, false, 1)
16/08/10 18:16:31 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/08/10 18:16:31 INFO DirectKafkaInputDStream: Remember duration = 1000 ms
16/08/10 18:16:31 INFO DirectKafkaInputDStream: Initialized and validated
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@9f052ff
16/08/10 18:16:31 INFO ForEachDStream: Slide time = 1000 ms
16/08/10 18:16:31 INFO ForEachDStream: Storage level = StorageLevel(false,
false, false, false, 1)
16/08/10 18:16:31 INFO ForEachDStream: Checkpoint interval = null
16/08/10 18:16:31 INFO ForEachDStream: Remember duration = 1000 ms
16/08/10 18:16:31 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@d8e872
16/08/10 18:16:31 INFO RecurringTimer: Started timer for JobGenerator at
time 1470833192000
16/08/10 18:16:31 INFO JobGenerator: Started JobGenerator at 1470833192000
ms
16/08/10 18:16:31 INFO JobScheduler: Started JobScheduler
16/08/10 18:16:31 INFO StreamingContext: StreamingContext started
16/08/10 18:16:32 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:32 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:32 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:32 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:32 INFO JobScheduler: Added jobs for time 1470833192000 ms
16/08/10 18:16:32 INFO JobScheduler: Starting job streaming job
1470833192000 ms.0 from job set of time 1470833192000 ms
16/08/10 18:16:32 INFO SparkContext: Starting job: json at todelete.scala:42
16/08/10 18:16:32 INFO DAGScheduler: Got job 0 (json at todelete.scala:42)
with 2 output partitions
16/08/10 18:16:32 INFO DAGScheduler: Final stage: ResultStage 0 (json at
todelete.scala:42)
16/08/10 18:16:32 INFO DAGScheduler: Parents of final stage: List()
16/08/10 18:16:32 INFO DAGScheduler: Missing parents: List()
16/08/10 18:16:32 INFO DAGScheduler: Submitting ResultStage 0
(MapPartitionsRDD[3] at json at todelete.scala:42), which has no missing
parents
16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0 stored as values in
memory (estimated size 4.7 KB, free 4.7 KB)
16/08/10 18:16:32 INFO MemoryStore: Block broadcast_0_piece0 stored as
bytes in memory (estimated size 2.6 KB, free 7.3 KB)
16/08/10 18:16:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on localhost:55361 (size: 2.6 KB, free: 511.5 MB)
16/08/10 18:16:32 INFO SparkContext: Created broadcast 0 from broadcast at
DAGScheduler.scala:1006
16/08/10 18:16:32 INFO DAGScheduler: Submitting 2 missing tasks from
ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:42)
16/08/10 18:16:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
16/08/10 18:16:32 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
16/08/10 18:16:32 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
16/08/10 18:16:32 INFO Executor: Fetching
http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
timestamp 1470833189531
16/08/10 18:16:32 INFO Utils: Fetching
http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp6365751707137950377.tmp
16/08/10 18:16:33 INFO JobScheduler: Added jobs for time 1470833193000 ms
16/08/10 18:16:33 INFO Executor: Adding
file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-assembly-1.6.2-hadoop2.6.0.jar
to class loader
16/08/10 18:16:33 INFO Executor: Fetching
http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
16/08/10 18:16:33 INFO Utils: Fetching
http://192.168.126.131:59491/jars/pain.jar to
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp9123209312936194653.tmp
16/08/10 18:16:33 INFO Executor: Adding
file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/pain.jar
to class loader
16/08/10 18:16:33 INFO Executor: Fetching
http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
with timestamp 1470833188094
16/08/10 18:16:33 INFO Utils: Fetching
http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
to
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/fetchFileTemp4190164655780820199.tmp
16/08/10 18:16:33 INFO Executor: Adding
file:/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/userFiles-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-kafka-assembly_2.10-1.6.2.jar
to class loader
16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 offsets
0 -> 20
16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms
16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
13366 bytes result sent to driver
16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
0) in 2423 ms on localhost (1/2)
16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms
16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms
16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms
16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms
16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms
16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 1470833200000 ms
16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms
16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms
16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms
16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms
16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms
16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms
16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms
16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms
16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms
16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 1470833210000 ms
16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms
16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms
16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms
16/08/10 18:16:54 INFO JobSch

On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <cody@koeninger.org> wrote:

> Those logs you're posting are from right after your failure, they don't
> include what actually went wrong when attempting to read json. Look at your
> logs more carefully.
> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" <
> diwakar.dhanuskodi@gmail.com> wrote:
>
>> Hi Siva,
>>
>> With below code, it is stuck up at
>> * sqlContext.read.json(rdd.map(_._2)).toDF()*
>> There are two partitions in  topic.
>> I am running spark 1.6.2
>>
>> val topics = "topic.name"
>> val brokers = "localhost:9092"
>> val topicsSet = topics.split(",").toSet
>> val sparkConf = new SparkConf().setAppName("KafkaW
>> eather").setMaster("local[5]")//spark://localhost:7077
>> val sc = new SparkContext(sparkConf)
>> val ssc = new StreamingContext(sc, Seconds(60))
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
>> group.id" -> "xyz","auto.offset.reset"->"smallest")
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> messages.foreachRDD(rdd => {
>>   if (rdd.isEmpty()) {
>>     println("Failed to get data from Kafka. Please check that the Kafka
>> producer is streaming data.")
>>     System.exit(-1)
>>   }
>>   val sqlContext = org.apache.spark.sql.SQLContex
>> t.getOrCreate(rdd.sparkContext)
>>   *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
>>   dataframe.foreach(println)
>>
>>               })
>>
>>
>> Below are logs,
>>
>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
>> todelete.scala:34) failed in 110.776 s
>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> stopped! Dropping event SparkListenerStageCompleted(or
>> g.apache.spark.scheduler.StageInfo@6d8ff688)
>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> stopped! Dropping event SparkListenerJobEnd(0,14708122
>> 71971,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because
>> SparkContext was shut down))
>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
>> MapOutputTrackerMasterEndpoint stopped!
>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
>> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
>> 16/08/10 12:27:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
>> OutputCommitCoordinator stopped!
>> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
>> Shutting down remote daemon.
>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
>> daemon shut down; proceeding with flushing remote transports.
>> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
>> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1
>> b6-01db-45b5-9302-d2f67f7c490e
>> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
>> Remoting shut down.
>> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
>> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
>> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
>> com.boa.poc.todelete --jars /home/cloudera/lib/spark-strea
>> ming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/spark
>> -assembly-1.6.2-hadoop2.6.0.jar /home/cloudera/Downloads/boa/pain.jar >
>> log.txt
>> Using Spark's default log4j profile: org/apache/spark/log4j-default
>> s.properties
>> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
>> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
>> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
>> interface eth1)
>> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> another address
>> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
>> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
>> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(cloudera);
>> users with modify permissions: Set(cloudera)
>> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver'
>> on port 42140.
>> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
>> 16/08/10 12:28:01 INFO Remoting: Starting remoting
>> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
>> 16/08/10 12:28:01 INFO Utils: Successfully started service
>> 'sparkDriverActorSystem' on port 53328.
>> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
>> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
>> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
>> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
>> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity
>> 511.5 MB
>> 16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
>> port 4040.
>> 16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
>> http://192.168.126.131:4040
>> 16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563a
>> d1-3d30-4a9c-ab11-82ecbb2e71b0
>> 16/08/10 12:28:02 INFO HttpServer: Starting HTTP Server
>> 16/08/10 12:28:02 INFO Utils: Successfully started service 'HTTP file
>> server' on port 58957.
>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
>> http://192.168.126.131:58957/jars/spark-streaming-kafka-asse
>> mbly_2.10-1.6.2.jar with timestamp 1470812282187
>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
>> http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
>> with timestamp 1470812282398
>> 16/08/10 12:28:02 INFO SparkContext: Added JAR
>> file:/home/cloudera/Downloads/boa/pain.jar at
>> http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402
>> 16/08/10 12:28:02 INFO Executor: Starting executor ID driver on host
>> localhost
>> 16/08/10 12:28:02 INFO Utils: Successfully started service
>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56716.
>> 16/08/10 12:28:02 INFO NettyBlockTransferService: Server created on 56716
>> 16/08/10 12:28:02 INFO BlockManagerMaster: Trying to register BlockManager
>> 16/08/10 12:28:02 INFO BlockManagerMasterEndpoint: Registering block
>> manager localhost:56716 with 511.5 MB RAM, BlockManagerId(driver,
>> localhost, 56716)
>> 16/08/10 12:28:02 INFO BlockManagerMaster: Registered BlockManager
>> 16/08/10 12:28:03 INFO VerifiableProperties: Verifying properties
>> 16/08/10 12:28:03 INFO VerifiableProperties: Property auto.offset.reset
>> is overridden to smallest
>> 16/08/10 12:28:03 INFO VerifiableProperties: Property group.id is
>> overridden to xyz
>> 16/08/10 12:28:03 INFO VerifiableProperties: Property zookeeper.connect
>> is overridden to
>> 16/08/10 12:28:03 INFO ForEachDStream: metadataCleanupDelay = -1
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Slide time = 60000 ms
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Checkpoint interval = null
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Remember duration = 60000
>> ms
>> 16/08/10 12:28:03 INFO DirectKafkaInputDStream: Initialized and validated
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@942f1c5
>> 16/08/10 12:28:03 INFO ForEachDStream: Slide time = 60000 ms
>> 16/08/10 12:28:03 INFO ForEachDStream: Storage level =
>> StorageLevel(false, false, false, false, 1)
>> 16/08/10 12:28:03 INFO ForEachDStream: Checkpoint interval = null
>> 16/08/10 12:28:03 INFO ForEachDStream: Remember duration = 60000 ms
>> 16/08/10 12:28:03 INFO ForEachDStream: Initialized and validated
>> org.apache.spark.streaming.dstream.ForEachDStream@a0ec143
>> 16/08/10 12:28:03 INFO RecurringTimer: Started timer for JobGenerator at
>> time 1470812340000
>> 16/08/10 12:28:03 INFO JobGenerator: Started JobGenerator at
>> 1470812340000 ms
>> 16/08/10 12:28:03 INFO JobScheduler: Started JobScheduler
>> 16/08/10 12:28:03 INFO StreamingContext: StreamingContext started
>> 16/08/10 12:29:00 INFO VerifiableProperties: Verifying properties
>> 16/08/10 12:29:00 INFO VerifiableProperties: Property auto.offset.reset
>> is overridden to smallest
>> 16/08/10 12:29:00 INFO VerifiableProperties: Property group.id is
>> overridden to xyz
>> 16/08/10 12:29:00 INFO VerifiableProperties: Property zookeeper.connect
>> is overridden to
>> 16/08/10 12:29:00 INFO JobScheduler: Added jobs for time 1470812340000 ms
>> 16/08/10 12:29:00 INFO JobScheduler: Starting job streaming job
>> 1470812340000 ms.0 from job set of time 1470812340000 ms
>> 16/08/10 12:29:00 INFO SparkContext: Starting job: json at
>> todelete.scala:34
>> 16/08/10 12:29:00 INFO DAGScheduler: Got job 0 (json at
>> todelete.scala:34) with 2 output partitions
>> 16/08/10 12:29:00 INFO DAGScheduler: Final stage: ResultStage 0 (json at
>> todelete.scala:34)
>> 16/08/10 12:29:00 INFO DAGScheduler: Parents of final stage: List()
>> 16/08/10 12:29:00 INFO DAGScheduler: Missing parents: List()
>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting ResultStage 0
>> (MapPartitionsRDD[3] at json at todelete.scala:34), which has no missing
>> parents
>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0 stored as values in
>> memory (estimated size 4.7 KB, free 4.7 KB)
>> 16/08/10 12:29:00 INFO MemoryStore: Block broadcast_0_piece0 stored as
>> bytes in memory (estimated size 2.6 KB, free 7.2 KB)
>> 16/08/10 12:29:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>> memory on localhost:56716 (size: 2.6 KB, free: 511.5 MB)
>> 16/08/10 12:29:00 INFO SparkContext: Created broadcast 0 from broadcast
>> at DAGScheduler.scala:1006
>> 16/08/10 12:29:00 INFO DAGScheduler: Submitting 2 missing tasks from
>> ResultStage 0 (MapPartitionsRDD[3] at json at todelete.scala:34)
>> 16/08/10 12:29:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
>> 16/08/10 12:29:01 INFO TaskSetManager: Starting task 1.0 in stage 0.0
>> (TID 0, localhost, partition 1,NODE_LOCAL, 2232 bytes)
>> 16/08/10 12:29:01 INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
>> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j
>> ars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp
>> 1470812282187
>> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j
>> ars/spark-streaming-kafka-assembly_2.10-1.6.2.jar to
>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56
>> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
>> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
>> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-
>> 9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class
>> loader
>> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j
>> ars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470812282398
>> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j
>> ars/spark-assembly-1.6.2-hadoop2.6.0.jar to
>> /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56
>> 864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
>> 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
>> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-
>> 9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar to class loader
>> 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/j
>> ars/pain.jar with timestamp 1470812282402
>> 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/j
>> ars/pain.jar to /tmp/spark-861074da-9bfb-475c-
>> a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d6
>> 2a37/fetchFileTemp100401525133805542.tmp
>> 16/08/10 12:29:02 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-
>> 475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
>> to class loader
>> 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0
>> offsets 0 -> 8
>> 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
>> 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset
>> is overridden to smallest
>> 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is
>> overridden to xyz
>> 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect
>> is overridden to
>> 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
>> 13366 bytes result sent to driver
>> 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0
>> (TID 0) in 2380 ms on localhost (1/2)
>> 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 1470812400000 ms
>> 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 1470812460000 ms
>> 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 1470812520000 ms
>> 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 1470812580000 ms
>> 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 1470812640000 ms
>>
>>
>> On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi <
>> diwakar.dhanuskodi@gmail.com> wrote:
>>
>>> Hi Siva,
>>>
>>> Does topic  has partitions? which version of Spark you are using?
>>>
>>> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kumaran@me.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Here is a working example I did.
>>>>
>>>> HTH
>>>>
>>>> Regards,
>>>>
>>>> Sivakumaran S
>>>>
>>>> val topics = "test"
>>>> val brokers = "localhost:9092"
>>>> val topicsSet = topics.split(",").toSet
>>>> val sparkConf = new SparkConf().setAppName("KafkaW
>>>> eatherCalc").setMaster("local") //spark://localhost:7077
>>>> val sc = new SparkContext(sparkConf)
>>>> val ssc = new StreamingContext(sc, Seconds(60))
>>>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>>> val messages = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>>>> messages.foreachRDD(rdd => {
>>>>   if (rdd.isEmpty()) {
>>>>     println("Failed to get data from Kafka. Please check that the Kafka
>>>> producer is streaming data.")
>>>>     System.exit(-1)
>>>>   }
>>>>   val sqlContext = org.apache.spark.sql.SQLContex
>>>> t.getOrCreate(rdd.sparkContext)
>>>>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>>>>   //Process your DF as required here on
>>>> }
>>>>
>>>>
>>>>
>>>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
>>>> diwakar.dhanuskodi@gmail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am reading json messages from kafka . Topics has 2 partitions. When
>>>> running streaming job using spark-submit, I could see that * val
>>>> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes
>>>> indefinitely. Am I doing something wrong here. Below is code .This
>>>> environment is cloudera sandbox env. Same issue in hadoop production
>>>> cluster mode except that it is restricted thats why tried to reproduce
>>>> issue in Cloudera sandbox. Kafka 0.10 and  Spark 1.4.
>>>>
>>>> val kafkaParams = Map[String,String]("bootstrap.
>>>> servers"->"localhost:9093,localhost:9092", "group.id" ->
>>>> "xyz","auto.offset.reset"->"smallest")
>>>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>>>> val ssc = new StreamingContext(conf, Seconds(1))
>>>>
>>>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>>>>
>>>> val topics = Set("gpp.minf")
>>>> val kafkaStream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>>>>
>>>> kafkaStream.foreachRDD(
>>>>   rdd => {
>>>>     if (rdd.count > 0){
>>>>        * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
>>>>        dataFrame.printSchema()
>>>> //dataFrame.foreach(println)
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message