Hello, everyone!
I've experienced problem, wnen using nested reduceByKeyAndWindow.
My task is to parse json-formatted events from textFileStream, and create aggregations for each field.
E.g. having such input:
{"type":"EventOne", "attr1":10,"attr2":20}
I have projections:
{"type":"EventOne", count:1}
{"type":"EventOne", "attr1":10, count:1}
{"type":"EventOne", "attr1":20, count:1}
{"type":"EventOne", "attr1":10, "attr2":20, count:1}
Each Durations.seconds (150) I pre-aggregate this data (to save and use for larger window). But larger windows receive no data at all - this looks like a bug.

Here is the code:
Duration preAggDuration = Durations.seconds(150);
Duration windowComputationPeriod = Durations.seconds(300);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(10));
JavaDStream
<String> lines = streamingContext.textFileStream("hdfs://my_dir"); // read lines JavaDStream<MyEvent> eventStream = lines.repartition(350).flatMap(new MyEventParser()); // parse lines into models JavaPairDStream<MyProjectionKey, MyProjection> projectionStream = eventStream.flatMapToPair(new MyProjectionFunction()); // each model is splitted into projections JavaPairDStream<MyProjectionKey, MyProjection> preAggStream = projectionStream.reduceByKeyAndWindow(new MySumFunction(), preAggDuration, preAggDuration); // pre-aggregated data preAggStream.foreachRDD(SAVE_AS_OBJECT_FILE);


// computations in large windows
for(int windowSize : new int[]{60,1440})
{
	JavaPairDStream<MyProjectionKey, MyProjection> windowStream = preAggStream.reduceByKeyAndWindow(new MySumFunction(), Durations.minutes(windowSize), windowComputationPeriod);
	windowStream.count().print(); // here I have no data :(
	JavaPairDStream<MyProjectionKey, MyProjection> windowMergedStream = windowStream.transformToPair(/* here goes merge of this window with historical data */);
	windowMergedStream.count.print(); // here I have zero :(

}