spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: Reading Data Using TextFileStream
Date Wed, 07 Jan 2015 11:19:21 GMT
You need to put some files in the location
*(/user/huser/user/huser/flume)* once
the job starts running, then only it will print. also note i missed the /
in the above code.

Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:42 PM, Jeniba Johnson <
Jeniba.Johnson@lntinfotech.com> wrote:

> Hi Akhil,
>
>
>
> I had used flat map method, so that the lines from a file will be printed
> as soon as I tailed it from flume to HDFS.
>
> Using the below mentioned code, the lines from a file are not being
> printed.
>
>
>
> *Output*
>
> *Welcome TO Flume Streaming*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: metadataCleanupDelay = -1*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: metadataCleanupDelay = -1*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: metadataCleanupDelay =
> -1*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: Slide time = 20000 ms*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: Storage level =
> StorageLevel(false, false, false, false, 1)*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: Checkpoint interval =
> null*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: Remember duration =
> 20000 ms*
>
> *15/01/07 22:32:46 INFO dstream.FileInputDStream: Initialized and
> validated org.apache.spark.streaming.dstream.FileInputDStream@6c8185d3*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: Slide time = 20000 ms*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: Storage level =
> StorageLevel(false, false, false, false, 1)*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: Checkpoint interval = null*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: Remember duration = 20000
> ms*
>
> *15/01/07 22:32:46 INFO dstream.MappedDStream: Initialized and validated
> org.apache.spark.streaming.dstream.MappedDStream@2b79174c*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: Slide time = 20000 ms*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: Storage level =
> StorageLevel(false, false, false, false, 1)*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: Checkpoint interval = null*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: Remember duration = 20000
> ms*
>
> *15/01/07 22:32:46 INFO dstream.ForEachDStream: Initialized and validated
> org.apache.spark.streaming.dstream.ForEachDStream@1ae894e0*
>
> *15/01/07 22:32:46 INFO util.RecurringTimer: Started timer for
> JobGenerator at time 1420650180000*
>
> *15/01/07 22:32:46 INFO scheduler.JobGenerator: Started JobGenerator at
> 1420650180000 ms*
>
> *15/01/07 22:32:46 INFO scheduler.JobScheduler: Started JobScheduler*
>
> *15/01/07 22:33:00 INFO dstream.FileInputDStream: Finding new files took
> 347 ms*
>
> *15/01/07 22:33:00 INFO dstream.FileInputDStream: New files at time
> 1420650180000 ms:*
>
>
>
> *15/01/07 22:33:00 INFO scheduler.JobScheduler: Added jobs for time
> 1420650180000 ms*
>
> *15/01/07 22:33:00 INFO scheduler.JobScheduler: Starting job streaming job
> 1420650180000 ms.0 from job set of time 1420650180000 ms*
>
> *-------------------------------------------*
>
> *Time: 1420650180000 ms*
>
> *-------------------------------------------*
>
>
>
> *15/01/07 22:33:00 INFO scheduler.JobScheduler: Finished job streaming job
> 1420650180000 ms.0 from job set of time 1420650180000 ms*
>
> *15/01/07 22:33:00 INFO scheduler.JobScheduler: Total delay: 0.424 s for
> time 1420650180000 ms (execution: 0.017 s)*
>
> *15/01/07 22:33:00 INFO dstream.FileInputDStream: Cleared 0 old files that
> were older than 1420650160000 ms:*
>
> *15/01/07 22:33:20 INFO dstream.FileInputDStream: Finding new files took 9
> ms*
>
> *15/01/07 22:33:20 INFO dstream.FileInputDStream: New files at time
> 1420650200000 ms:*
>
>
>
> *-------------------------------------------*
>
> *15/01/07 22:33:20 INFO scheduler.JobScheduler: Starting job streaming job
> 1420650200000 ms.0 from job set of time 1420650200000 ms*
>
> *15/01/07 22:33:20 INFO scheduler.JobScheduler: Added jobs for time
> 1420650200000 ms*
>
> *Time: 1420650200000 ms*
>
>
>
> Regards,
>
> Jeniba Johnson
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Wednesday, January 07, 2015 4:17 PM
> *To:* Jeniba Johnson; user@spark.apache.org
>
> *Subject:* Re: Reading Data Using TextFileStream
>
>
>
> How about the following code? I'm not quiet sure what you were doing
> inside the flatmap and foreach.
>
>
>
> import org.apache.spark.SparkConf;
>
> import org.apache.spark.api.java.JavaRDD;
>
> import org.apache.spark.api.java.function.FlatMapFunction;
>
> import org.apache.spark.api.java.function.Function;
>
> import org.apache.spark.streaming.Duration;
>
> import org.apache.spark.streaming.api.java.JavaDStream;
>
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
>
>
> import com.google.common.collect.Lists;
>
>
>
> import java.util.Arrays;
>
> import java.util.List;
>
> import java.util.regex.Pattern;
>
>
>
> public final class Test1 {
>
>   public static void main(String[] args) throws Exception {
>
>
>
>     SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
>
>     JavaStreamingContext ssc = new
> JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));
>
>
>
>     JavaDStream<String> textStream =
> ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in
> HDFS
>
>
>
>
>
>     textStream.print();
>
>
>
>     System.out.println("Welcome TO Flume Streaming");
>
>     ssc.start();
>
>     ssc.awaitTermination();
>
>   }
>
>
>
> }
>
>
> Thanks
>
> Best Regards
>
>
>
> On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson <
> Jeniba.Johnson@lntinfotech.com> wrote:
>
> Hi Akhil,
>
>
>
> I had missed the forward slash in the directory part. After correcting the
> directory path ,Now Iam facing with the below mentioned error.
>
> Can anyone help me with this issue.
>
>
>
> 15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took
> 360 ms
>
> 15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time
> 1420647920000 ms:
>
>
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time
> 1420647920000 ms
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
> 1420647920000 ms.0 from job set of time 1420647920000 ms
>
> -------------------------------------------
>
> Time: 1420647920000 ms
>
> -------------------------------------------
>
>
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job
> 1420647920000 ms.0 from job set of time 1420647920000 ms
>
> 15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job
> 1420647920000 ms.1 from job set of time 1420647920000 ms
>
> 15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job
> streaming job 1420647920000 ms.1
>
> java.lang.UnsupportedOperationException: empty collection
>
>         at org.apache.spark.rdd.RDD.first(RDD.scala:1094)
>
>         at
> org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433)
>
>         at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)
>
>         at xyz.Test1$2.call(Test1.java:67)
>
>         at xyz.Test1$2.call(Test1.java:1)
>
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
>
>         at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
>
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
>
>         at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
>         at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>
>         at scala.util.Try$.apply(Try.scala:161)
>
>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>
>         at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>         at java.lang.Thread.run(Thread.java:722)
>
>
>
>
>
> Regards,
>
> Jeniba Johnson
>
>
>
> *From:* Akhil Das [mailto:akhil@sigmoidanalytics.com]
> *Sent:* Wednesday, January 07, 2015 12:11 PM
> *To:* Jeniba Johnson
> *Cc:* Hari Shreedharan (hshreedharan@cloudera.com); dev@spark.apache.org
> *Subject:* Re: Reading Data Using TextFileStream
>
>
>
> I think you need to start your streaming job, then put the files there to
> get them read. textFileStream doesn't read the existing files i believe.
>
>
>
> Also are you sure the path is not the following? (no missing / in the
> beginning?)
>
>
>
> JavaDStream<String> textStream = ssc.textFileStream("*/*user/
> huser/user/huser/flume");
>
>
> Thanks
>
> Best Regards
>
>
>
> On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson <
> Jeniba.Johnson@lntinfotech.com> wrote:
>
>
> Hi Hari,
>
> Iam trying to read data from a file which is stored in HDFS. Using Flume
> the data is tailed and stored in HDFS.
> Now I want to read this data using TextFileStream. Using the below
> mentioned code Iam not able to fetch the
> Data  from a file which is stored in HDFS. Can anyone help me with this
> issue.
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.FlatMapFunction;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
>
> import com.google.common.collect.Lists;
>
> import java.util.Arrays;
> import java.util.List;
> import java.util.regex.Pattern;
>
> public final class Test1 {
>   public static void main(String[] args) throws Exception {
>
>     SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
>     JavaStreamingContext ssc = new
> JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));
>
>     JavaDStream<String> textStream =
> ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in
> HDFS
>
>
>     JavaDStream<String> suspectedStream = textStream.flatMap(new
> FlatMapFunction<String,String>()
>      {
>                             public Iterable<String> call(String line)
> throws Exception {
>
>                             //return
> Arrays.asList(line.toString().toString());
>                            return
> Lists.newArrayList(line.toString().toString());
>                              }
>      });
>
>
>     suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){
>
>         public Void call(JavaRDD<String> rdd) throws Exception {
>         List<String> output = rdd.collect();
>         System.out.println("Sentences Collected from Flume " + output);
>                return  null;
>         }
>         });
>
>     suspectedStream.print();
>
>     System.out.println("Welcome TO Flume Streaming");
>     ssc.start();
>     ssc.awaitTermination();
>   }
>
> }
>
> The command I use is:
> ./bin/spark-submit --verbose --jars
> lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*]
> --deploy-mode client --class xyz.Test1 bin/filestream3.jar
>
>
>
>
>
> Regards,
> Jeniba Johnson
>
>
> ________________________________
> The contents of this e-mail and any attachment(s) may contain confidential
> or privileged information for the intended recipient(s). Unintended
> recipients are prohibited from taking action on the basis of information in
> this e-mail and using or disseminating the information, and must notify the
> sender and delete it from their system. L&T Infotech will not accept
> responsibility or liability for the accuracy or completeness of, or the
> presence of any virus or disabling code in this e-mail"
>
>
>
>
>

Mime
View raw message