spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeniba Johnson <Jeniba.John...@lntinfotech.com>
Subject Reading Data Using TextFileStream
Date Wed, 07 Jan 2015 03:46:48 GMT

Hi Hari,

Iam trying to read data from a file which is stored in HDFS. Using Flume the data is tailed
and stored in HDFS.
Now I want to read this data using TextFileStream. Using the below mentioned code Iam not
able to fetch the
Data  from a file which is stored in HDFS. Can anyone help me with this issue.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import com.google.common.collect.Lists;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class Test1 {
  public static void main(String[] args) throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext("local[4]","JavaWordCount",  new Duration(20000));

    JavaDStream<String> textStream = ssc.textFileStream("user/huser/user/huser/flume");//Data
Directory Path in HDFS


    JavaDStream<String> suspectedStream = textStream.flatMap(new FlatMapFunction<String,String>()
     {
                            public Iterable<String> call(String line) throws Exception
{

                            //return Arrays.asList(line.toString().toString());
                           return  Lists.newArrayList(line.toString().toString());
                             }
     });


    suspectedStream.foreach(new Function<JavaRDD<String>,Void>(){

        public Void call(JavaRDD<String> rdd) throws Exception {
        List<String> output = rdd.collect();
        System.out.println("Sentences Collected from Flume " + output);
               return  null;
        }
        });

    suspectedStream.print();

    System.out.println("Welcome TO Flume Streaming");
    ssc.start();
    ssc.awaitTermination();
  }

}

The command I use is:
./bin/spark-submit --verbose --jars lib/spark-examples-1.1.0-hadoop1.0.4.jar,lib/mysql.jar
--master local[*] --deploy-mode client --class xyz.Test1 bin/filestream3.jar





Regards,
Jeniba Johnson


________________________________
The contents of this e-mail and any attachment(s) may contain confidential or privileged information
for the intended recipient(s). Unintended recipients are prohibited from taking action on
the basis of information in this e-mail and using or disseminating the information, and must
notify the sender and delete it from their system. L&T Infotech will not accept responsibility
or liability for the accuracy or completeness of, or the presence of any virus or disabling
code in this e-mail"

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message