spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander Krasheninnikov <>
Subject Nested reduceByKeyAndWindow losing data
Date Fri, 05 Jun 2015 11:25:47 GMT
Hello, everyone!
I've experienced problem, wnen using nested reduceByKeyAndWindow.
My task is to parse json-formatted events from textFileStream, and 
create aggregations for each field.
E.g. having such input:

{"type":"EventOne", "attr1":10,"attr2":20}

I have projections:

{"type":"EventOne", count:1}

{"type":"EventOne", "attr1":10, count:1}

{"type":"EventOne", "attr1":20, count:1}

{"type":"EventOne", "attr1":10, "attr2":20, count:1}

Each Durations.seconds (150) I pre-aggregate this data (to save and use 
for larger window). But larger windows receive no data at all - this 
looks like a bug.

Here is the code:

Duration preAggDuration = Durations.seconds(150);
Duration windowComputationPeriod = Durations.seconds(300);
JavaStreamingContext streamingContext =newJavaStreamingContext(sparkConf,Durations.seconds(10));

JavaDStream<String> lines = streamingContext.textFileStream("hdfs://my_dir"); // read
JavaDStream<MyEvent> eventStream = lines.repartition(350).flatMap(new MyEventParser());
// parse lines into models
JavaPairDStream<MyProjectionKey, MyProjection> projectionStream = eventStream.flatMapToPair(new
MyProjectionFunction()); // each model is splitted into projections
JavaPairDStream<MyProjectionKey, MyProjection> preAggStream = projectionStream.reduceByKeyAndWindow(new
MySumFunction(), preAggDuration, preAggDuration); // pre-aggregated data

// computations in large windows
for(int windowSize : new int[]{60,1440})
	JavaPairDStream<MyProjectionKey, MyProjection> windowStream = preAggStream.reduceByKeyAndWindow(new
MySumFunction(), Durations.minutes(windowSize), windowComputationPeriod);
	windowStream.count().print(); // here I have no data :(
	JavaPairDStream<MyProjectionKey, MyProjection> windowMergedStream = windowStream.transformToPair(/*
here goes merge of this window with historical data */);
	windowMergedStream.count.print(); // here I have zero :(


View raw message