spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <>
Subject Re: Spark streaming Processing time keeps increasing
Date Fri, 17 Jul 2015 07:39:21 GMT
Responses inline.

On Thu, Jul 16, 2015 at 9:27 PM, N B <> wrote:

> Hi TD,
> Yes, we do have the invertible function provided. However, I am not sure I
> understood how to use the filterFunction. Is there an example somewhere
> showing its usage?
> The header comment on the function says :
> * @param filterFunc     function to filter expired key-value pairs;
> *                       only pairs that satisfy the function are retained
> *                       set this to null if you do not want to filter
> These are the questions I am confused about:
> 1. The code comment seems to imply that the filterFunc is only used to figure out which
keyvalue pairs are used to form the window but how does it actually help expire the old data?
> It applies to filter and retains only the keys that pass through it.
Underneath, its all RDDs, so only the filtered K, V pairs are retained (and
cached) for future batches.

> 2. Shouldn't the values that are "falling off" of the window period automatically be
removed without the need for an additional filter function?
> It cannot figure out the "falling off" the in this incremental version.
For example, if you are counting over the window by adding (reduceFunc) and
subtracting (invRedueFunc), unless your provided the concept of a "zero" ,
it will not know when to throw away the keys that have become 0. Over a
window, the count may increase from "nothing" to 10, and then reduce "0"
when the window moves forward, but it does not know "0" means "dont track
it any more". The filter function introduces that concept of zero.

> 3. Which side of the key-value pairs are passed to this function? The ones that are coming
in or the ones that are going out of window or both?
> All of the k,v pairs that are being tracked.

> 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation may not
have  the requisite info (such as a timestamp or similar eg if its aggregated data) to help
determine whether to return true or false. What is the semantic expected here?
> I am not sure I get your question. It is upto you to provide sufficient
information as part of the "value" so that you can take that decision in
the filter function.

> As always, thanks for your help
> Nikunj
> On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das <>
> wrote:
>> MAke sure you provide the filterFunction with the invertible
>> reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
>> key space will continue increase. This is what is leading to the lag. So
>> use the filtering function to filter out the keys that are not needed any
>> more.
>> On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das <>
>> wrote:
>>> What is your data volume? Are you having checkpointing/WAL enabled? In
>>> that case make sure you are having SSD disks as this behavior is mainly due
>>> to the IO wait.
>>> Thanks
>>> Best Regards
>>> On Thu, Jul 16, 2015 at 8:43 AM, N B <> wrote:
>>>> Hello,
>>>> We have a Spark streaming application and the problem that we are
>>>> encountering is that the batch processing time keeps on increasing and
>>>> eventually causes the application to start lagging. I am hoping that
>>>> someone here can point me to any underlying cause of why this might happen.
>>>> The batch interval is 1 minute as of now and the app does some maps,
>>>> filters, joins and reduceByKeyAndWindow operations. All the reduces are
>>>> invertible functions and so we do provide the inverse-reduce functions in
>>>> all those. The largest window size we have is 1 hour right now. When the
>>>> app is started, we see that the batch processing time is between 20 and 30
>>>> seconds. It keeps creeping up slowly and by the time it hits the 1 hour
>>>> mark, it somewhere around 35-40 seconds. Somewhat expected and still not
>>>> bad!
>>>> I would expect that since the largest window we have is 1 hour long,
>>>> the application should stabilize around the 1 hour mark and start
>>>> processing subsequent batches within that 35-40 second zone. However, that
>>>> is not what is happening. The processing time still keeps increasing and
>>>> eventually in a few hours it exceeds 1 minute mark and then starts lagging.
>>>> Eventually the lag builds up and becomes in minutes at which point we have
>>>> to restart the system.
>>>> Any pointers on why this could be happening and what we can do to
>>>> troubleshoot further?
>>>> Thanks
>>>> Nikunj

View raw message