How about the following code? I'm not quiet sure what you were doing inside the flatmap and foreach.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class Test1 {
  public static void main(String[] args) throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));

    JavaDStream<String> textStream = ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in HDFS


    textStream.print();

    System.out.println("Welcome TO Flume Streaming");
    ssc.start();
    ssc.awaitTermination();
  }

}

Thanks
Best Regards

On Wed, Jan 7, 2015 at 4:06 PM, Jeniba Johnson <Jeniba.Johnson@lntinfotech.com> wrote:

Hi Akhil,

 

I had missed the forward slash in the directory part. After correcting the directory path ,Now Iam facing with the below mentioned error.

Can anyone help me with this issue.

 

15/01/07 21:55:20 INFO dstream.FileInputDStream: Finding new files took 360 ms

15/01/07 21:55:20 INFO dstream.FileInputDStream: New files at time 1420647920000 ms:

 

15/01/07 21:55:20 INFO scheduler.JobScheduler: Added jobs for time 1420647920000 ms

15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job 1420647920000 ms.0 from job set of time 1420647920000 ms

-------------------------------------------

Time: 1420647920000 ms

-------------------------------------------

 

15/01/07 21:55:20 INFO scheduler.JobScheduler: Finished job streaming job 1420647920000 ms.0 from job set of time 1420647920000 ms

15/01/07 21:55:20 INFO scheduler.JobScheduler: Starting job streaming job 1420647920000 ms.1 from job set of time 1420647920000 ms

15/01/07 21:55:20 ERROR scheduler.JobScheduler: Error running job streaming job 1420647920000 ms.1

java.lang.UnsupportedOperationException: empty collection

        at org.apache.spark.rdd.RDD.first(RDD.scala:1094)

        at org.apache.spark.api.java.JavaRDDLike$class.first(JavaRDDLike.scala:433)

        at org.apache.spark.api.java.JavaRDD.first(JavaRDD.scala:32)

        at xyz.Test1$2.call(Test1.java:67)

        at xyz.Test1$2.call(Test1.java:1)

        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)

        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)

        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)

        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:527)

        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)

        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)

        at scala.util.Try$.apply(Try.scala:161)

        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)

        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        at java.lang.Thread.run(Thread.java:722)

 

 

Regards,

Jeniba Johnson

 

From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Wednesday, January 07, 2015 12:11 PM
To: Jeniba Johnson
Cc: Hari Shreedharan (hshreedharan@cloudera.com); dev@spark.apache.org
Subject: Re: Reading Data Using TextFileStream

 

I think you need to start your streaming job, then put the files there to get them read. textFileStream doesn't read the existing files i believe.

 

Also are you sure the path is not the following? (no missing / in the beginning?)

 

JavaDStream<String> textStream = ssc.textFileStream("/user/huser/user/huser/flume");


Thanks

Best Regards

 

On Wed, Jan 7, 2015 at 9:16 AM, Jeniba Johnson <Jeniba.Johnson@lntinfotech.com> wrote:


Hi Hari,

Iam trying to read data from a file which is stored in HDFS. Using Flume the data is tailed and stored in HDFS.
Now I want to read this data using TextFileStream. Using the below mentioned code Iam not able to fetch the
Data  from a file which is stored in HDFS. Can anyone help me with this issue.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class Test1 {
  public static void main(String[] args) throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));

    JavaDStream<String> textStream = ssc.textFileStream("user/huser/user/huser/flume");//Data Directory Path in HDFS


    JavaDStream<String> suspectedStream = textStream.flatMap(new FlatMapFunction<String,String>()
     {
                            public Iterable<String> call(String line) throws Exception {

                            //return Arrays.asList(line.toString().toString());
                           return  Lists.newArrayList(line.toString().toString());
                             }
     });


    suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){

        public Void call(JavaRDD<String> rdd) throws Exception {
        List<String> output = rdd.collect();
        System.out.println("Sentences Collected from Flume " + output);
               return  null;
        }
        });

    suspectedStream.print();

    System.out.println("Welcome TO Flume Streaming");
    ssc.start();
    ssc.awaitTermination();
  }

}

The command I use is:
./bin/spark-submit --verbose --jars lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar --master local[*] --deploy-mode client --class xyz.Test1 bin/filestream3.jar





Regards,
Jeniba Johnson


________________________________
The contents of this e-mail and any attachment(s) may contain confidential or privileged information for the intended recipient(s). Unintended recipients are prohibited from taking action on the basis of information in this e-mail and using or disseminating the information, and must notify the sender and delete it from their system. L&T Infotech will not accept responsibility or liability for the accuracy or completeness of, or the presence of any virus or disabling code in this e-mail"