TD, it looks like your instincts were correct.  I misunderstood what you meant.  If I force an eval on the inputstream using foreachRDD, the windowing will work correctly.  If I donít do that, lazy eval somehow screws with window batches I eventually receive.  Any reason the bug is categorized as minor?  It seems that anyone who uses the windowing functionality would run into this bug.  I imagine this would include anyone who wants to use spark streaming to aggregate data in fixed time batches, which seems like a fairly common use case.

Alan


On Jul 22, 2014, at 11:30 PM, Alan Ngai <alan@opsclarity.com> wrote:

foreachRDD is how I extracted values in the first place, so thatís not going to make a difference.  I donít think itís related to SPARK-1312 because Iím generating data every second in the first place and Iím using foreachRDD right after the window operation.  The code looks something like

val batchInterval = 5
val windowInterval = 25
val slideInterval = 15

val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval))

val outputFunc = (r: RDD[MetricEvent], t: Time) => {
  println("======================================== %s".format(t.milliseconds / 1000))
  r.foreach{metric =>
    val timeKey = metric.timeStamp / batchInterval * batchInterval
    println("%s %s %s %s".format(timeKey, metric.timeStamp, metric.name, metric.value))
  }
}
testWindow.foreachRDD(outputFunc)

On Jul 22, 2014, at 10:13 PM, Tathagata Das <tathagata.das1565@gmail.com> wrote:

It could be related to this bug that is currently open. 

Here is a workaround. Can you put a inputStream.foreachRDD(rdd => { }) and try these combos again?

TD


On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai <alan@opsclarity.com> wrote:
I have a sample application pumping out records 1 per second.  The batch interval is set to 5 seconds.  Hereís a list of ďobserved window intervalsĒ vs what was actually set

window=25, slide=25 : observed-window=25, overlapped-batches=0
window=25, slide=20 : observed-window=20, overlapped-batches=0
window=25, slide=15 : observed-window=15, overlapped-batches=0
window=25, slide=10 : observed-window=20, overlapped-batches=2
window=25, slide=5 : observed-window=25, overlapped-batches=3

can someone explain this behavior to me?  Iím trying to aggregate metrics by time batches, but want to skip partial batches.  Therefore, Iím trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there.

Alan