spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: Spark Streaming multiple streams problem
Date Wed, 06 Aug 2014 22:09:59 GMT
You probably have only 10 cores in your cluster on which you are executing
your job. since each dstream / receiver take one core each, the system is
not able to start all of them and so everything is blocked.


On Wed, Aug 6, 2014 at 3:08 AM, Laeeq Ahmed <laeeqspark@yahoo.com.invalid>
wrote:

> Hi,
>
> I am reading multiple streams from multiple ports with a single streaming
> context. I have created array of Dstream. This works until 10 streams. But
> if I go over that ( i have checked with 15 and 20 streams), spark streaming
> task stucks and is taking time. I waited for 10 minutes(2.2 min in the
> attached screenshot), still not going through. The attached streaming UI
> shows where it stucks.
>
> If this is not the right way to read multiple streams, what else is the
> alternative?? I dont want to union the streams. I want to read them
> simultaneously in parallel.
>
> object StreamAnomalyDetector {
>
> def calculate(sumOfSquare: Double, sumOfN: Double, n: Int):(Int,( Double,
> Double, Double)) ={
>
>         val mean = sumOfN/n
>         val varience = sumOfSquare/n - math.pow(mean,2)
>         return (n, (mean, varience, math.sqrt(varience)))
> }
>
> def main(args: Array[String]) {
>     if (args.length < 3) {
>       System.err.println("Usage: StreamAnomalyDetector <master> <hostname>
> <port>")
>       System.exit(1)
>     }
>         //Setting systen properties
>         //System.setProperty("spark.cores.max", "3")
>         System.setProperty("spark.executor.memory", "5g")
>
>     // Create the context
>     val ssc = new StreamingContext(args(0), "StreamAnomalyDetector",
> Milliseconds(1000),
>     System.getenv("SPARK_HOME"),
> List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar"))
>
>     //hdfs path to checkpoint old data
>
> ssc.checkpoint("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/checkpointing/")
>
>     val eegStreams = new
> Array[org.apache.spark.streaming.dstream.DStream[String]](args.length - 2)
> //array for multiple streams
>
>     // Create the NetworkInputDStream
>     for (a <- 0 to (args.length - 3))
>         {
>         eegStreams(a) = ssc.socketTextStream(args(1), args(a+2).toInt,
> StorageLevel.MEMORY_AND_DISK_SER) //Multiple DStreams into Array
>         val sums = eegStreams(a).map(x => (math.pow(x.toDouble, 2),
> x.toDouble, 1)).reduceByWindow((a, b) => (a._1 + b._1, a._2 + b._2, a._3 +
> b._3),(a, b) => (a._1 - b._1, a._2 - b._2, a._3 - b._3),  Seconds(4),
> Seconds(4))
>         val meanAndSD = sums.map(x => calculate(x._1,x._2,x._3))
>
> meanAndSD.saveAsTextFiles("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/output/"
> + (a + 1) )
>         }
>
>
>     ssc.start()
>     ssc.awaitTermination()
>     }
> }
>
>
> Regards,
> Laeeq
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

Mime
View raw message