spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From praveshjain1991 <praveshjain1...@gmail.com>
Subject Re: Spark Streaming not processing file with particular number of entries
Date Fri, 06 Jun 2014 06:33:19 GMT
Hi,

I am using Spark-1.0.0 over a 3 node cluster with 1 master and 2 slaves. I
am trying to run LR algorithm over Spark Streaming. 

    package org.apache.spark.examples.streaming;

    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.List;
    import java.util.regex.Pattern;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.mllib.classification.LogisticRegressionModel;
    import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
    import org.apache.spark.mllib.regression.LabeledPoint;
    import org.apache.spark.mllib.linalg.Vector;
    import org.apache.spark.mllib.linalg.Vectors;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    
    /**
     * Logistic regression based classification using ML Lib.
     */
    public final class StreamingJavaLR {
    	static int i = 1;
    
    	// static LogisticRegressionModel model;
    
    	// private static final Pattern SPACE = Pattern.compile(" ");
    
    	static class ParsePoint implements Function<String, LabeledPoint> {
    		private static final Pattern COMMA = Pattern.compile(",");
    		private static final Pattern SPACE = Pattern.compile(" ");
    
    		@Override
    		public LabeledPoint call(String line) {
    			String[] parts = COMMA.split(line);
    			double y = Double.parseDouble(parts[0]);
    			String[] tok = SPACE.split(parts[1]);
    			double[] x = new double[tok.length];
    			for (int i = 0; i < tok.length; ++i) {
    				x[i] = Double.parseDouble(tok[i]);
    			}
    			return new LabeledPoint(y, Vectors.dense(x));
    		}
    	}
    
    	// Edited
    	static class ParsePointforInput implements Function<String, double[]> {
    		private static final Pattern SPACE = Pattern.compile(" ");
    
    		@Override
    		public double[] call(String line) {
    			String[] tok = SPACE.split(line);
    			double[] x = new double[tok.length];
    			for (int i = 0; i < tok.length; ++i) {
    				x[i] = Double.parseDouble(tok[i]);
    			}
    			return x;
    		}
    	}
    
    	public static void main(String[] args) {
    
    		if (args.length != 5) {
    			System.err
    					.println("Usage: JavaLR <master> <input_file_for_training>
<step_size> <no_iters> <input_file_for_prediction>");
    			System.exit(1);
    		}
    
    		FileWriter file;
    		PrintWriter outputFile = null;
    		SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    		Calendar cal=Calendar.getInstance();
    
    		final Date startTime;
    
            System.out.println("<<<<<Let's Print>>>>>");
    
    //		SparkConf conf = new SparkConf()
    //                .setMaster(args[0])
    //				.setAppName("StreamingJavaLR")
    //                .set("spark.cleaner.ttl", "1000")
    //                .set("spark.executor.uri",
"hdfs://192.168.145.191:9000/user/praveshj/spark/spark-0.9.1.tar.gz")
    //				.setJars(JavaSparkContext.jarOfClass(StreamingJavaLR.class));
    //
    //		JavaSparkContext sc = new JavaSparkContext(conf);
    
    		 JavaSparkContext sc = new JavaSparkContext(args[0],
    		 "StreamingJavaLR",
    		 System.getenv("SPARK_HOME"),
    		 JavaSparkContext.jarOfClass(StreamingJavaLR.class));
    
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Reading
File");
    		JavaRDD<String> lines = sc.textFile(args[1]);
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>File
has
been Read now mapping");
    		JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Mapping
Done");
    		double stepSize = Double.parseDouble(args[2]);
    		int iterations = Integer.parseInt(args[3]);
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Read
the
arguments. stepSize = "+stepSize+" and iterations = "+iterations);
    
    		BufferedReader br = null;
    
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Training
the
Model");
    		final LogisticRegressionModel model = LogisticRegressionWithSGD.train(
    				points.rdd(), iterations, stepSize);
           
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Model
Trained");
    
            System.out.println("Final w: " + model.weights());
            //		printWeights(model.weights());
    		System.out.println("Intercept : " + model.intercept());
    
    		final Vector weightVector = model.weights();
    		
    //		double[] weightArray = model.weights();
    //
    //		final DoubleMatrix weightMatrix = new DoubleMatrix(weightArray);
    
    		sc.stop();
    
    		try {
    			Thread.sleep(1000);
    		} catch (InterruptedException ex) {
    			Thread.currentThread().interrupt();
    		}
    
    //		try {
    //			file = new FileWriter(args[5]);
    //			outputFile = new PrintWriter(file);
    //			cal = Calendar.getInstance();
    //			cal.getTime();
    ////			startTime = sdf.format(cal.getTime());
    //			startTime = cal.getTime();
    //			outputFile.println("Start Time : " + startTime);
    //			outputFile.flush();
    //		} catch (IOException E) {
    //			E.printStackTrace();
    //		}
    		
    //		final JavaStreamingContext ssc = new JavaStreamingContext(sc,
    //				new Duration(1000));
    
    		startTime = cal.getTime();
    		
    		final JavaStreamingContext ssc = new JavaStreamingContext(args[0],
    		"StreamingJavaLR", new Duration(1000),
    		System.getenv("SPARK_HOME"),
    		JavaStreamingContext.jarOfClass(StreamingJavaLR.class));
    
    		JavaDStream<String> lines_2 = ssc.textFileStream(args[4]);
    		JavaDStream<double[]> points_2 = lines_2.map(new
ParsePointforInput());
    		// points_2.print();
    
    		
    		// System.out.print(lines_2.count());
    		// System.exit(0);
    		points_2.foreachRDD(new Function<JavaRDD&lt;double[]>, Void>() {
    
    			@Override
    			public Void call(JavaRDD rdd) {
    
    				List<double[]> temp = rdd.collect();
    				
    				//If no more data is left for Prediction, Stop the Program
    //				if (rdd.count() == 0)
    //					ssc.stop();
    				FileWriter newfile = null;
    				BufferedWriter bw = null;
    
    				try {
    					newfile = new FileWriter(
    							"/home/pravesh/data/abc"
    									+ i++ + ".txt");
    					bw = new BufferedWriter(newfile);
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				int inpNo = 0;
    				double result;
    				for (double[] dArray : temp) {
    					double[][] dataArray = new double[1][2];
    					for (int i = 0; i < dArray.length; i++)
    						dataArray[0][i] = dArray[i];
    //					DoubleMatrix dataMatrix = new DoubleMatrix(dataArray);
    //					result = model.predictPoint(dataMatrix, weightMatrix,
    //							model.intercept());
    					
    					Vector dataVector = Vectors.dense(dArray);
    					result = model.predictPoint(dataVector, weightVector,
model.intercept());
    
    					try {						
    						Calendar cal2 = Calendar.getInstance();
    //						bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo
+ " (" + dataMatrix.get(0, 0)
    //								+ ", " + dataMatrix.get(0, 1) + ")"
    //								+ " belongs to : " + result + " and Start Time was " +
startTime + "\n");
    						
    						bw.write("INFO at " + cal2.getTime() + " : " + "Point " + inpNo +
" (" + dataVector.toArray()[0]
    								+ ", " + dataVector.toArray()[1] + ")"
    								+ " belongs to : " + result + " and Start Time was " + startTime
+ "\n");
    						
    						bw.flush();
    					} catch (IOException e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					}
    					// newoutputFile.flush();
    					inpNo++;
    				}
    				try {
    					bw.close();
    					newfile.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    				Void v = null;
    				return v;
    			}
    		});
    		
    		ssc.start();
    		ssc.awaitTermination();
    		
    //		cal = Calendar.getInstance();
    //		outputFile.println("  End Time : " + cal.getTime());
    //		outputFile.flush();
    		
    		System.exit(0);
    	}
    }

As you can see, I take input from files for training the model with
JavaSparkContext and for testing the model with JavaStreamingContext.

I have used the data given in $SPARK_HOME/mllib/data/lr-data/random.data for
training and testing. To obtain larger data sets, I have copied this data.
The code works fine for every possible set of data in local mode. Over the
cluster, however, it is not able to process the file containing 0.4million
entries.

For every other data set (file with 0.8 million entries here), the output is
like (Output after the StreamingContext is started):

    14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113609-0001/0 on hostPort host-DSRV05.host.co.in:55206 with 8
cores, 512.0 MB RAM
    14/06/06 11:36:09 INFO AppClient$ClientActor: Executor added:
app-20140606113609-0001/1 on
worker-20140606114445-host-DSRV04.host.co.in-39342
(host-DSRV04.host.co.in:39342) with 8 cores
    14/06/06 11:36:09 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113609-0001/1 on hostPort host-DSRV04.host.co.in:39342 with 8
cores, 512.0 MB RAM
    14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated:
app-20140606113609-0001/0 is now RUNNING
    14/06/06 11:36:09 INFO AppClient$ClientActor: Executor updated:
app-20140606113609-0001/1 is now RUNNING
    14/06/06 11:36:09 INFO RecurringTimer: Started timer for JobGenerator at
time 1402034770000
    14/06/06 11:36:09 INFO JobGenerator: Started JobGenerator at
1402034770000 ms
    14/06/06 11:36:09 INFO JobScheduler: Started JobScheduler
    14/06/06 11:36:10 INFO FileInputDStream: Finding new files took 29 ms
    14/06/06 11:36:10 INFO FileInputDStream: New files at time 1402034770000
ms:
    file:/newdisk1/praveshj/pravesh/data/input/testing8lk.txt
    14/06/06 11:36:10 INFO MemoryStore: ensureFreeSpace(33216) called with
curMem=0, maxMem=309225062
    14/06/06 11:36:10 INFO MemoryStore: Block broadcast_0 stored as values
to memory (estimated size 32.4 KB, free 294.9 MB)
    14/06/06 11:36:10 INFO FileInputFormat: Total input paths to process : 1
    14/06/06 11:36:10 INFO JobScheduler: Added jobs for time 1402034770000
ms
    14/06/06 11:36:10 INFO JobScheduler: Starting job streaming job
1402034770000 ms.0 from job set of time 1402034770000 ms
    14/06/06 11:36:10 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:10 INFO DAGScheduler: Got job 0 (collect at
StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
    14/06/06 11:36:10 INFO DAGScheduler: Final stage: Stage 0(collect at
StreamingJavaLR.java:170)
    14/06/06 11:36:10 INFO DAGScheduler: Parents of final stage: List()
    14/06/06 11:36:10 INFO DAGScheduler: Missing parents: List()
    14/06/06 11:36:10 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at MappedDStream.scala:35), which has no missing parents
    14/06/06 11:36:10 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
    14/06/06 11:36:10 INFO TaskSchedulerImpl: Adding task set 0.0 with 1
tasks
    14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:47657/user/Executor#-1277914179]
with ID 0
    14/06/06 11:36:10 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
    14/06/06 11:36:10 INFO TaskSetManager: Serialized task 0.0:0 as 3544
bytes in 1 ms
    14/06/06 11:36:10 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:46975/user/Executor#1659982546]
with ID 1
    14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager
host-DSRV05.host.co.in:52786 with 294.9 MB RAM
    14/06/06 11:36:10 INFO BlockManagerInfo: Registering block manager
host-DSRV04.host.co.in:42008 with 294.9 MB RAM
    14/06/06 11:36:11 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:11 INFO FileInputDStream: New files at time 1402034771000
ms:
    
    14/06/06 11:36:11 INFO JobScheduler: Added jobs for time 1402034771000
ms
    14/06/06 11:36:12 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:36:12 INFO FileInputDStream: New files at time 1402034772000
ms:
    
    14/06/06 11:36:12 INFO JobScheduler: Added jobs for time 1402034772000
ms
    14/06/06 11:36:13 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:13 INFO FileInputDStream: New files at time 1402034773000
ms:
    
    14/06/06 11:36:13 INFO JobScheduler: Added jobs for time 1402034773000
ms
    14/06/06 11:36:14 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:14 INFO FileInputDStream: New files at time 1402034774000
ms:
    
    14/06/06 11:36:14 INFO JobScheduler: Added jobs for time 1402034774000
ms
    14/06/06 11:36:15 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:15 INFO FileInputDStream: New files at time 1402034775000
ms:
    
    14/06/06 11:36:15 INFO JobScheduler: Added jobs for time 1402034775000
ms
    14/06/06 11:36:15 INFO BlockManagerInfo: Added taskresult_0 in memory on
host-DSRV05.host.co.in:52786 (size: 19.9 MB, free: 275.0 MB)
    14/06/06 11:36:15 INFO SendingConnection: Initiating connection to
[host-DSRV05.host.co.in/192.168.145.195:52786]
    14/06/06 11:36:15 INFO SendingConnection: Connected to
[host-DSRV05.host.co.in/192.168.145.195:52786], 1 messages pending
    14/06/06 11:36:15 INFO ConnectionManager: Accepted connection from
[host-DSRV05.host.co.in/192.168.145.195]
    14/06/06 11:36:15 INFO BlockManagerInfo: Removed taskresult_0 on
host-DSRV05.host.co.in:52786 in memory (size: 19.9 MB, free: 294.9 MB)
    14/06/06 11:36:15 INFO DAGScheduler: Completed ResultTask(0, 0)
    14/06/06 11:36:15 INFO TaskSetManager: Finished TID 0 in 4961 ms on
host-DSRV05.host.co.in (progress: 1/1)
    14/06/06 11:36:15 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool 
    14/06/06 11:36:15 INFO DAGScheduler: Stage 0 (collect at
StreamingJavaLR.java:170) finished in 5.533 s
    14/06/06 11:36:15 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 5.548644244 s
    14/06/06 11:36:16 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:36:16 INFO FileInputDStream: New files at time 1402034776000
ms:
    
    14/06/06 11:36:16 INFO JobScheduler: Added jobs for time 1402034776000
ms
    14/06/06 11:36:17 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:17 INFO FileInputDStream: New files at time 1402034777000
ms:
    
    14/06/06 11:36:17 INFO JobScheduler: Added jobs for time 1402034777000
ms
    14/06/06 11:36:18 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:18 INFO FileInputDStream: New files at time 1402034778000
ms:
    
    14/06/06 11:36:18 INFO JobScheduler: Added jobs for time 1402034778000
ms
    14/06/06 11:36:19 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:19 INFO FileInputDStream: New files at time 1402034779000
ms:
    
    14/06/06 11:36:19 INFO JobScheduler: Added jobs for time 1402034779000
ms
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034770000 ms.0 from job set of time 1402034770000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 9.331 s for time
1402034770000 ms (execution: 9.274 s)
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 2.7293E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034771000 ms.0 from job set of time 1402034771000 ms
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034771000 ms.0 from job set of time 1402034771000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 8.333 s for time
1402034771000 ms (execution: 0.000 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034772000 ms.0 from job set of time 1402034772000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.4859E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034772000 ms.0 from job set of time 1402034772000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 7.335 s for time
1402034772000 ms (execution: 0.002 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034773000 ms.0 from job set of time 1402034773000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.5294E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034773000 ms.0 from job set of time 1402034773000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 6.336 s for time
1402034773000 ms (execution: 0.001 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034774000 ms.0 from job set of time 1402034774000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.117E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034774000 ms.0 from job set of time 1402034774000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 5.337 s for time
1402034774000 ms (execution: 0.001 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034775000 ms.0 from job set of time 1402034775000 ms
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were
older than 1402034769000 ms: 
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.1414E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034775000 ms.0 from job set of time 1402034775000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 4.338 s for time
1402034775000 ms (execution: 0.001 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034776000 ms.0 from job set of time 1402034776000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 4.2422E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034776000 ms.0 from job set of time 1402034776000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 3.338 s for time
1402034776000 ms (execution: 0.000 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034777000 ms.0 from job set of time 1402034777000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 3 from persistence list
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.1133E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034777000 ms.0 from job set of time 1402034777000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 2.339 s for time
1402034777000 ms (execution: 0.000 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034778000 ms.0 from job set of time 1402034778000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.124E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034778000 ms.0 from job set of time 1402034778000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 1.340 s for time
1402034778000 ms (execution: 0.001 s)
    14/06/06 11:36:19 INFO JobScheduler: Starting job streaming job
1402034779000 ms.0 from job set of time 1402034779000 ms
    14/06/06 11:36:19 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:19 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 1.2101E-5 s
    14/06/06 11:36:19 INFO JobScheduler: Finished job streaming job
1402034779000 ms.0 from job set of time 1402034779000 ms
    14/06/06 11:36:19 INFO JobScheduler: Total delay: 0.341 s for time
1402034779000 ms (execution: 0.001 s)
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 3
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 2 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 2
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 1 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 1
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 0 old files that were
older than 1402034770000 ms: 
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 6 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 6
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 5 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 5
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 4 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 4
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034771000 ms: 1402034770000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 9 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 9
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 8 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 8
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 7 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 7
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034772000 ms: 1402034771000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 12 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 12
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 11 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 11
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 10 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 10
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034773000 ms: 1402034772000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 15 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 15
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 14 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 14
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 13 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 13
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034774000 ms: 1402034773000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 18 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 18
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 17 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 17
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 16 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 16
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034775000 ms: 1402034774000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 21 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 21
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 20 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 20
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 19 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 19
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034776000 ms: 1402034775000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 24 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 24
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 23 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 23
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 22 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 22
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034777000 ms: 1402034776000 ms
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 27 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 27
    14/06/06 11:36:19 INFO MappedRDD: Removing RDD 26 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 26
    14/06/06 11:36:19 INFO UnionRDD: Removing RDD 25 from persistence list
    14/06/06 11:36:19 INFO BlockManager: Removing RDD 25
    14/06/06 11:36:19 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034778000 ms: 1402034777000 ms
    14/06/06 11:36:20 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:20 INFO FileInputDStream: New files at time 1402034780000
ms:
    
    14/06/06 11:36:20 INFO JobScheduler: Added jobs for time 1402034780000
ms
    14/06/06 11:36:20 INFO JobScheduler: Starting job streaming job
1402034780000 ms.0 from job set of time 1402034780000 ms
    14/06/06 11:36:20 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:20 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 2.8574E-5 s
    14/06/06 11:36:20 INFO JobScheduler: Finished job streaming job
1402034780000 ms.0 from job set of time 1402034780000 ms
    14/06/06 11:36:20 INFO MappedRDD: Removing RDD 30 from persistence list
    14/06/06 11:36:20 INFO JobScheduler: Total delay: 0.006 s for time
1402034780000 ms (execution: 0.002 s)
    14/06/06 11:36:20 INFO BlockManager: Removing RDD 30
    14/06/06 11:36:20 INFO MappedRDD: Removing RDD 29 from persistence list
    14/06/06 11:36:20 INFO BlockManager: Removing RDD 29
    14/06/06 11:36:20 INFO UnionRDD: Removing RDD 28 from persistence list
    14/06/06 11:36:20 INFO BlockManager: Removing RDD 28
    14/06/06 11:36:20 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034779000 ms: 1402034778000 ms
    14/06/06 11:36:21 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:21 INFO FileInputDStream: New files at time 1402034781000
ms:
    
    14/06/06 11:36:21 INFO JobScheduler: Added jobs for time 1402034781000
ms
    14/06/06 11:36:21 INFO JobScheduler: Starting job streaming job
1402034781000 ms.0 from job set of time 1402034781000 ms
    14/06/06 11:36:21 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:21 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 3.023E-5 s
    14/06/06 11:36:21 INFO JobScheduler: Finished job streaming job
1402034781000 ms.0 from job set of time 1402034781000 ms
    14/06/06 11:36:21 INFO MappedRDD: Removing RDD 33 from persistence list
    14/06/06 11:36:21 INFO JobScheduler: Total delay: 0.006 s for time
1402034781000 ms (execution: 0.002 s)
    14/06/06 11:36:21 INFO BlockManager: Removing RDD 33
    14/06/06 11:36:21 INFO MappedRDD: Removing RDD 32 from persistence list
    14/06/06 11:36:21 INFO BlockManager: Removing RDD 32
    14/06/06 11:36:21 INFO UnionRDD: Removing RDD 31 from persistence list
    14/06/06 11:36:21 INFO BlockManager: Removing RDD 31
    14/06/06 11:36:21 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034780000 ms: 1402034779000 ms
    14/06/06 11:36:22 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:36:22 INFO FileInputDStream: New files at time 1402034782000
ms:
    
    14/06/06 11:36:22 INFO JobScheduler: Added jobs for time 1402034782000
ms
    14/06/06 11:36:22 INFO JobScheduler: Starting job streaming job
1402034782000 ms.0 from job set of time 1402034782000 ms
    14/06/06 11:36:22 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:36:22 INFO SparkContext: Job finished: collect at
StreamingJavaLR.java:170, took 3.9897E-5 s
    14/06/06 11:36:22 INFO JobScheduler: Finished job streaming job
1402034782000 ms.0 from job set of time 1402034782000 ms
    14/06/06 11:36:22 INFO MappedRDD: Removing RDD 36 from persistence list
    14/06/06 11:36:22 INFO JobScheduler: Total delay: 0.006 s for time
1402034782000 ms (execution: 0.002 s)
    14/06/06 11:36:22 INFO BlockManager: Removing RDD 36
    14/06/06 11:36:22 INFO MappedRDD: Removing RDD 35 from persistence list
    14/06/06 11:36:22 INFO BlockManager: Removing RDD 35
    14/06/06 11:36:22 INFO UnionRDD: Removing RDD 34 from persistence list
    14/06/06 11:36:22 INFO BlockManager: Removing RDD 34
    14/06/06 11:36:22 INFO FileInputDStream: Cleared 1 old files that were
older than 1402034781000 ms: 1402034780000 ms




For file with 0.4 million entries, the ouput is (Output after
StreamingContext is started) :




    14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added:
app-20140606113855-0003/0 on
worker-20140606114445-host-DSRV05.host.co.in-55206
(host-DSRV05.host.co.in:55206) with 8 cores
    14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113855-0003/0 on hostPort host-DSRV05.host.co.in:55206 with 8
cores, 512.0 MB RAM
    14/06/06 11:38:55 INFO AppClient$ClientActor: Executor added:
app-20140606113855-0003/1 on
worker-20140606114445-host-DSRV04.host.co.in-39342
(host-DSRV04.host.co.in:39342) with 8 cores
    14/06/06 11:38:55 INFO SparkDeploySchedulerBackend: Granted executor ID
app-20140606113855-0003/1 on hostPort host-DSRV04.host.co.in:39342 with 8
cores, 512.0 MB RAM
    14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated:
app-20140606113855-0003/0 is now RUNNING
    14/06/06 11:38:55 INFO AppClient$ClientActor: Executor updated:
app-20140606113855-0003/1 is now RUNNING
    14/06/06 11:38:55 INFO RecurringTimer: Started timer for JobGenerator at
time 1402034936000
    14/06/06 11:38:55 INFO JobGenerator: Started JobGenerator at
1402034936000 ms
    14/06/06 11:38:55 INFO JobScheduler: Started JobScheduler
    14/06/06 11:38:56 INFO FileInputDStream: Finding new files took 31 ms
    14/06/06 11:38:56 INFO FileInputDStream: New files at time 1402034936000
ms:
    file:/newdisk1/praveshj/pravesh/data/input/testing4lk.txt
    14/06/06 11:38:56 INFO MemoryStore: ensureFreeSpace(33216) called with
curMem=0, maxMem=309225062
    14/06/06 11:38:56 INFO MemoryStore: Block broadcast_0 stored as values
to memory (estimated size 32.4 KB, free 294.9 MB)
    14/06/06 11:38:56 INFO FileInputFormat: Total input paths to process : 1
    14/06/06 11:38:56 INFO JobScheduler: Added jobs for time 1402034936000
ms
    14/06/06 11:38:56 INFO JobScheduler: Starting job streaming job
1402034936000 ms.0 from job set of time 1402034936000 ms
    14/06/06 11:38:56 INFO SparkContext: Starting job: collect at
StreamingJavaLR.java:170
    14/06/06 11:38:56 INFO DAGScheduler: Got job 0 (collect at
StreamingJavaLR.java:170) with 1 output partitions (allowLocal=false)
    14/06/06 11:38:56 INFO DAGScheduler: Final stage: Stage 0(collect at
StreamingJavaLR.java:170)
    14/06/06 11:38:56 INFO DAGScheduler: Parents of final stage: List()
    14/06/06 11:38:56 INFO DAGScheduler: Missing parents: List()
    14/06/06 11:38:56 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at MappedDStream.scala:35), which has no missing parents
    14/06/06 11:38:56 INFO DAGScheduler: Submitting 1 missing tasks from
Stage 0 (MappedRDD[3] at map at MappedDStream.scala:35)
    14/06/06 11:38:56 INFO TaskSchedulerImpl: Adding task set 0.0 with 1
tasks
    14/06/06 11:38:57 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:38:57 INFO FileInputDStream: New files at time 1402034937000
ms:
    
    14/06/06 11:38:57 INFO JobScheduler: Added jobs for time 1402034937000
ms
    14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@host-DSRV05.host.co.in:39424/user/Executor#-500165450]
with ID 0
    14/06/06 11:38:57 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on
executor 0: host-DSRV05.host.co.in (PROCESS_LOCAL)
    14/06/06 11:38:57 INFO TaskSetManager: Serialized task 0.0:0 as 3544
bytes in 0 ms
    14/06/06 11:38:57 INFO SparkDeploySchedulerBackend: Registered executor:
Actor[akka.tcp://sparkExecutor@host-DSRV04.host.co.in:45532/user/Executor#1654371091]
with ID 1
    14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager
host-DSRV05.host.co.in:53857 with 294.9 MB RAM
    14/06/06 11:38:57 INFO BlockManagerInfo: Registering block manager
host-DSRV04.host.co.in:38057 with 294.9 MB RAM
    14/06/06 11:38:58 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:38:58 INFO FileInputDStream: New files at time 1402034938000
ms:
    
    14/06/06 11:38:58 INFO JobScheduler: Added jobs for time 1402034938000
ms
    14/06/06 11:38:59 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:38:59 INFO FileInputDStream: New files at time 1402034939000
ms:
    
    14/06/06 11:38:59 INFO JobScheduler: Added jobs for time 1402034939000
ms
    14/06/06 11:39:00 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:00 INFO FileInputDStream: New files at time 1402034940000
ms:
    
    14/06/06 11:39:00 INFO JobScheduler: Added jobs for time 1402034940000
ms
    14/06/06 11:39:01 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:01 INFO FileInputDStream: New files at time 1402034941000
ms:
    
    14/06/06 11:39:01 INFO JobScheduler: Added jobs for time 1402034941000
ms
    14/06/06 11:39:02 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:02 INFO FileInputDStream: New files at time 1402034942000
ms:
    
    14/06/06 11:39:02 INFO JobScheduler: Added jobs for time 1402034942000
ms
    14/06/06 11:39:03 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:03 INFO FileInputDStream: New files at time 1402034943000
ms:
    
    14/06/06 11:39:03 INFO JobScheduler: Added jobs for time 1402034943000
ms
    14/06/06 11:39:04 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:04 INFO FileInputDStream: New files at time 1402034944000
ms:
    
    14/06/06 11:39:04 INFO JobScheduler: Added jobs for time 1402034944000
ms
    14/06/06 11:39:05 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:39:05 INFO FileInputDStream: New files at time 1402034945000
ms:
    
    14/06/06 11:39:05 INFO JobScheduler: Added jobs for time 1402034945000
ms
    14/06/06 11:39:06 INFO FileInputDStream: Finding new files took 1 ms
    14/06/06 11:39:06 INFO FileInputDStream: New files at time 1402034946000
ms:
    
    14/06/06 11:39:06 INFO JobScheduler: Added jobs for time 1402034946000
ms
    14/06/06 11:39:07 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:07 INFO FileInputDStream: New files at time 1402034947000
ms:
    
    14/06/06 11:39:07 INFO JobScheduler: Added jobs for time 1402034947000
ms
    14/06/06 11:39:08 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:08 INFO FileInputDStream: New files at time 1402034948000
ms:
    
    14/06/06 11:39:08 INFO JobScheduler: Added jobs for time 1402034948000
ms
    14/06/06 11:39:09 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:09 INFO FileInputDStream: New files at time 1402034949000
ms:
    
    14/06/06 11:39:09 INFO JobScheduler: Added jobs for time 1402034949000
ms
    14/06/06 11:39:10 INFO FileInputDStream: Finding new files took 0 ms
    14/06/06 11:39:10 INFO FileInputDStream: New files at time 1402034950000
ms:

and this goes on forever. It doesn't print the output in the file it is
supposed to.
The worker logs don't output anything different.

Any idea what might be the issue?

--

Thanks



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-not-processing-file-with-particular-number-of-entries-tp6694p7110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Mime
View raw message