spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <tathagata.das1...@gmail.com>
Subject Re: streaming window not behaving as advertised (v1.0.1)
Date Sun, 27 Jul 2014 02:10:46 GMT
Yeah, maybe I should bump the issue to major. Now that I thought about
to give my previous answer, this should be easy to fix just by doing a
foreachRDD on all the input streams within the system (rather than
explicitly doing it like I asked you to do).

Thanks Alan, for testing this out and confirming that this was the
same issue. I was worried that this is a totally new issue that we did
not know of.

TD

On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai <alan@opsclarity.com> wrote:
> TD, it looks like your instincts were correct.  I misunderstood what you
> meant.  If I force an eval on the inputstream using foreachRDD, the
> windowing will work correctly.  If I don’t do that, lazy eval somehow screws
> with window batches I eventually receive.  Any reason the bug is categorized
> as minor?  It seems that anyone who uses the windowing functionality would
> run into this bug.  I imagine this would include anyone who wants to use
> spark streaming to aggregate data in fixed time batches, which seems like a
> fairly common use case.
>
> Alan
>
>
>
> On Jul 22, 2014, at 11:30 PM, Alan Ngai <alan@opsclarity.com> wrote:
>
> foreachRDD is how I extracted values in the first place, so that’s not going
> to make a difference.  I don’t think it’s related to SPARK-1312 because I’m
> generating data every second in the first place and I’m using foreachRDD
> right after the window operation.  The code looks something like
>
> val batchInterval = 5
> val windowInterval = 25
> val slideInterval = 15
>
> val windowedStream = inputStream.window(Seconds(windowInterval),
> Seconds(slideInterval))
>
> val outputFunc = (r: RDD[MetricEvent], t: Time) => {
>   println("========================================
> %s".format(t.milliseconds / 1000))
>   r.foreach{metric =>
>     val timeKey = metric.timeStamp / batchInterval * batchInterval
>     println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name,
> metric.value))
>   }
> }
> testWindow.foreachRDD(outputFunc)
>
> On Jul 22, 2014, at 10:13 PM, Tathagata Das <tathagata.das1565@gmail.com>
> wrote:
>
> It could be related to this bug that is currently open.
> https://issues.apache.org/jira/browse/SPARK-1312
>
> Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and
> try these combos again?
>
> TD
>
>
> On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <alan@opsclarity.com> wrote:
>>
>> I have a sample application pumping out records 1 per second.  The batch
>> interval is set to 5 seconds.  Here’s a list of “observed window intervals”
>> vs what was actually set
>>
>> window=25, slide=25 : observed-window=25, overlapped-batches=0
>> window=25, slide=20 : observed-window=20, overlapped-batches=0
>> window=25, slide=15 : observed-window=15, overlapped-batches=0
>> window=25, slide=10 : observed-window=20, overlapped-batches=2
>> window=25, slide=5 : observed-window=25, overlapped-batches=3
>>
>> can someone explain this behavior to me?  I’m trying to aggregate metrics
>> by time batches, but want to skip partial batches.  Therefore, I’m trying to
>> find a combination which results in 1 overlapped batch, but no combination I
>> tried gets me there.
>>
>> Alan
>>
>
>
>

Mime
View raw message