spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Suraj Satishkumar Sheth <suraj...@adobe.com>
Subject Spark Streaming : Not working with TextFileStream on HDFS
Date Mon, 17 Feb 2014 09:09:55 GMT
Hi,
I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera.

I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory.
Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success).
But, it is not adding any data/output to it. I verified that I am adding new files to the
correct HDFS directory. Another change I have noticed is that, Spark picks up the initial
files present in the directory used for streaming. The behaviour seems like Batch processing.
Although, at specified interval, it does create a new folder in HDFS with _Success.
So, the major issue is that it is not able to recognize new files created in HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome,
ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")
 data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user<path/to/file>"
+ Random.nextInt))
 ssc.start


It is creating these directories with only _Success : 
stream562343230
stream1228731977
stream318151149
stream603511115


This is the error stack I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job
set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29,
took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job
set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution:
0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000
ms: 
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job
set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29,
took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job
set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution:
0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000
ms: 
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job
set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29,
took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job
set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000
ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution:
0.102 s)


Thanks and Regards,
Suraj Sheth

Mime
View raw message