spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guillermo Ortiz <konstt2...@gmail.com>
Subject Re: Spark Streaming and Windows, it always counts the logs during all the windows. Why?
Date Fri, 26 Dec 2014 13:17:52 GMT
Oh, I didn't understand what I was doing, my fault (too much parties
these xmas). Thought windows works in another weird way. Sorry for the
questions..

2014-12-26 13:42 GMT+01:00 Guillermo Ortiz <konstt2000@gmail.com>:
> I'm trying to understand why it's not working and I typed some println
> to check what the code was executing..
>
>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>     println("1"); //********************Just one time, when I start the program
>     val filterSql = lines.filter(line => line.contains("SQL"))
>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
> classOf[Akamai]))
>     val groupSql = jsonSql.map {
>       json =>
>         val srcIp = json.getMessage().getCliIP()
>         val srcURL = json.getMessage().getReqHost()
>         (srcIp + "_" + srcURL, json)
>     }
>     println("2"); //********************Just one time, when I start the program
>
>     val errorLinesValueReduce =
> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>
>     println("3"); //********************Just one time, when I start the program
>     errorLinesValueReduce.foreachRDD {
>       rdd =>
>         rdd.foreach { elem1 =>
>
>           println("4 " + elem1); //********************All time
>           if (elem1._2.size > 0) {
>             println("do something")
>           }
>         }
>     }
>     println("fin foreachRdd");  ///********************Just one time,
> when I start the program
>
>
> Why it's just executing the println("4...")?? shouldn't it execute all
> the code each 15 seconds that it's what it's defined on the context
> (val ssc = new StreamingContext(sparkConf, Seconds(15));)
>
> 2014-12-26 10:56 GMT+01:00 Guillermo Ortiz <konstt2000@gmail.com>:
>> I'm trying to make some operation with windows and intervals.
>>
>> I get data every15 seconds, and want to have a windows of 60 seconds
>> with  batch intervals of 15 seconds.
>> I''m injecting data with ncat. if I inject 3 logs in the same interval
>> I get into the "do something" each 15 secods during one minute,
>> I understand that I get into "do something" the first interval but the
>> logs shouldn't appear in the next interval, Why do I get into "do
>> something" in all the intervals for a minute? What am I doing wrong?
>>
>>
>>     val sparkConf = new SparkConf().setMaster(sparkMode).setAppName("MiApp");
>>     val ssc = new StreamingContext(sparkConf, Seconds(15));
>>     val lines = ssc.socketTextStream("localhost", sparkPort.toInt);
>>     ssc.checkpoint(sparkCheckPoint)
>>
>>     ruleSqlInjection(lines)
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>
>>
>>
>>   def ruleSqlInjection(lines: ReceiverInputDStream[String]) = {
>>     val filterSql = lines.filter(line => line.contains("SQL"))
>>     val jsonSql = filterSql.map(line => JsonUtil.read(line.getBytes(),
>> classOf[Model]))
>>     val groupSql = jsonSql.map {
>>       json =>
>>         val srcIp = json.getMessage().getCliIP()
>>         val srcURL = json.getMessage().getReqHost()
>>         (srcIp + "_" + srcURL, json)
>>     }
>>     val errorLinesValueReduce =
>> groupSql.groupByKeyAndWindow(Seconds(60), Seconds(15))
>>     errorLinesValueReduce.foreachRDD {
>>       rdd =>
>>         val elem1 = rdd.take(1)
>>         println("take1 ->" + elem1(0)._1)
>>         println("take2 ->" + elem1(0)._2)  //it's always getting the
>> logs for the first 15 seconds during one minute..
>>
>>         if (elem1.size > 0) {
>>           val alerts = elem1(0)._2
>>           if (alerts.size > 2) {
>>             println("do something")  // I don't undestand why it's
>> getting into here 4 intervals
>>           }
>>         }
>>     }
>>   }

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message