spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sanjay Awatramani <sanjay_a...@yahoo.com>
Subject Re: Sliding Window operations do not work as documented
Date Mon, 24 Mar 2014 14:45:02 GMT
Hi All,

I found out why this problem exists. Consider the following scenario:
- a DStream is created from any source. (I've checked with file and socket)
- No actions are applied to this DStream
- Sliding Window operation is applied to this DStream and an action is applied to the sliding
window.
In this case, Spark will not even read the input stream in the batch in which the sliding
interval isn't a multiple of batch interval. Put another way, it won't read the input when
it doesn't have to apply the window function. This is happening because all transformations
in Spark are lazy.

How to fix this or workaround it (see line#3):
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
inputStream.print(); // This is the workaround
JavaDStream<String> objWindow = inputStream.window(new Duration(windowLen*60*1000),
new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");


The "Window operations" example on the streaming guide implies that Spark will read the stream
in every batch, which is not happening because of the lazy transformations.
Wherever sliding window would be used, in most of the cases, no actions will be taken on the
pre-window batch, hence my gut feeling was that Streaming would read every batch if any actions
are being taken in the windowed stream.

Regards,
Sanjay



On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sanjay_awat@yahoo.com> wrote:
 
Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite
similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html
. 

The RDDs returned by window transformation function are incorrect in my case. To investigate
this further, I ran a series of examples with varying values of window length & slide
interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be multiples
of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate
this condition. Looking at my results, it seems that failures result when the slide interval
isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
JavaDStream<String> objWindow = inputStream.window(new Duration(windowLen*60*1000),
new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or there is a problem
in the sliding window mechanism.

Regards,
Sanjay
Mime
View raw message