spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Laeeq Ahmed <laeeqsp...@yahoo.com.INVALID>
Subject Spark Streaming multiple streams problem
Date Wed, 06 Aug 2014 10:08:57 GMT
Hi,

I am reading multiple streams from multiple ports with a single streaming context. I have
created array of Dstream. This works until 10 streams. But if I go over that ( i have checked
with 15 and 20 streams), spark streaming task stucks and is taking time. I waited for 10 minutes(2.2
min in the attached screenshot), still not going through. The attached streaming UI shows
where it stucks.

If this is not the right way to read multiple streams, what else is the alternative?? I dont
want to union the streams. I want to read them simultaneously in parallel.


object StreamAnomalyDetector {

def calculate(sumOfSquare: Double, sumOfN: Double, n: Int):(Int,( Double, Double, Double))
={

        val mean = sumOfN/n
        val varience = sumOfSquare/n - math.pow(mean,2)
        return (n, (mean, varience, math.sqrt(varience)))
}


def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: StreamAnomalyDetector <master> <hostname>
<port>")
      System.exit(1)
    }
        //Setting systen properties
        //System.setProperty("spark.cores.max", "3")
        System.setProperty("spark.executor.memory", "5g")

    // Create the context
    val ssc = new StreamingContext(args(0), "StreamAnomalyDetector", Milliseconds(1000),
    System.getenv("SPARK_HOME"), List("target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar"))

    //hdfs path to checkpoint old data
    ssc.checkpoint("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/checkpointing/")

    val eegStreams = new Array[org.apache.spark.streaming.dstream.DStream[String]](args.length
- 2) //array for multiple streams

    // Create the NetworkInputDStream
    for (a <- 0 to (args.length - 3))
        {
        eegStreams(a) = ssc.socketTextStream(args(1), args(a+2).toInt, StorageLevel.MEMORY_AND_DISK_SER)
//Multiple DStreams into Array 
        val sums = eegStreams(a).map(x => (math.pow(x.toDouble, 2), x.toDouble,
1)).reduceByWindow((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3),(a, b) => (a._1
- b._1, a._2 - b._2, a._3 - b._3),  Seconds(4), Seconds(4))
        val meanAndSD = sums.map(x => calculate(x._1,x._2,x._3))
        meanAndSD.saveAsTextFiles("hdfs://host-10-20-20-17.novalocal:9000/user/hduser/output/"
+ (a + 1) )
        }


    ssc.start()
    ssc.awaitTermination()
    }
}



Regards,
Laeeq

Mime
View raw message