spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
Subject Re: Stateful Operation on JavaPairDStream Help Needed !!
Date Tue, 01 Mar 2016 00:58:38 GMT
Could you post the screenshot of the Streaming DAG and also the driver log?
It would be great if you have a simple producer for us to debug.

On Mon, Feb 29, 2016 at 1:39 AM, Abhishek Anand <abhis.anan007@gmail.com>
wrote:

> Hi Ryan,
>
> Its not working even after removing the reduceByKey.
>
> So, basically I am doing the following
> - reading from kafka
> - flatmap inside transform
> - mapWithState
> - rdd.count on output of mapWithState
>
> But to my surprise still dont see checkpointing taking place.
>
> Is there any restriction to the type of operation that we can perform
> inside mapWithState ?
>
> Really need to resolve this one as currently if my application is
> restarted from checkpoint it has to repartition 120 previous stages which
> takes hell lot of time.
>
> Thanks !!
> Abhi
>
> On Mon, Feb 29, 2016 at 3:42 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Sorry that I forgot to tell you that you should also call `rdd.count()`
>> for "reduceByKey" as well. Could you try it and see if it works?
>>
>> On Sat, Feb 27, 2016 at 1:17 PM, Abhishek Anand <abhis.anan007@gmail.com>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> I am using mapWithState after doing reduceByKey.
>>>
>>> I am right now using mapWithState as you suggested and triggering the
>>> count manually.
>>>
>>> But, still unable to see any checkpointing taking place. In the DAG I
>>> can see that the reduceByKey operation for the previous batches are also
>>> being computed.
>>>
>>>
>>> Thanks
>>> Abhi
>>>
>>>
>>> On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Hey Abhi,
>>>>
>>>> Using reducebykeyandwindow and mapWithState will trigger the bug
>>>> in SPARK-6847. Here is a workaround to trigger checkpoint manually:
>>>>
>>>>     JavaMapWithStateDStream<...> stateDStream =
>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc));
>>>>     stateDStream.foreachRDD(new Function1<...>() {
>>>>       @Override
>>>>       public Void call(JavaRDD<...> rdd) throws Exception {
>>>>         rdd.count();
>>>>       }
>>>>     });
>>>>     return stateDStream.stateSnapshots();
>>>>
>>>>
>>>> On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <
>>>> abhis.anan007@gmail.com> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> Reposting the code.
>>>>>
>>>>> Basically my use case is something like - I am receiving the web
>>>>> impression logs and may get the notify (listening from kafka) for those
>>>>> impressions in the same interval (for me its 1 min) or any next interval
>>>>> (upto 2 hours). Now, when I receive notify for a particular impression
I
>>>>> need to swap the date field in impression with the date field in notify
>>>>> logs. The notify for an impression has the same key as impression.
>>>>>
>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>> Tuple2<String, MyClass>>() {
>>>>> @Override
>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
one,
>>>>> State<MyClass> state) {
>>>>> MyClass nullObj = new MyClass();
>>>>> nullObj.setImprLog(null);
>>>>> nullObj.setNotifyLog(null);
>>>>> MyClass current = one.or(nullObj);
>>>>>
>>>>> if(current!= null && current.getImprLog() != null &&
>>>>> current.getMyClassType() == 1 /*this is impression*/){
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> else if (current.getNotifyLog() != null  && current.getMyClassType()
>>>>> == 3 /*notify for the impression received*/){
>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>> if(oldState!= null && oldState.getNotifyLog() != null){
>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>  //swappping the dates
>>>>> return new Tuple2<>(key, oldState);
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>> }
>>>>> else{
>>>>> return new Tuple2<>(key, null);
>>>>> }
>>>>>
>>>>> }
>>>>> };
>>>>>
>>>>>
>>>>> return
>>>>> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots();
>>>>>
>>>>>
>>>>> Currently I am using reducebykeyandwindow without the inverse function
>>>>> and I am able to get the correct data. But, issue the might arise is
when I
>>>>> have to restart my application from checkpoint and it repartitions and
>>>>> computes the previous 120 partitions, which delays the incoming batches.
>>>>>
>>>>>
>>>>> Thanks !!
>>>>> Abhi
>>>>>
>>>>> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> Hey Abhi,
>>>>>>
>>>>>> Could you post how you use mapWithState? By default, it should do
>>>>>> checkpointing every 10 batches.
>>>>>> However, there is a known issue that prevents mapWithState from
>>>>>> checkpointing in some special cases:
>>>>>> https://issues.apache.org/jira/browse/SPARK-6847
>>>>>>
>>>>>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <
>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>
>>>>>>> Any Insights on this one ?
>>>>>>>
>>>>>>>
>>>>>>> Thanks !!!
>>>>>>> Abhi
>>>>>>>
>>>>>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <
>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>
>>>>>>>> I am now trying to use mapWithState in the following way
using some
>>>>>>>> example codes. But, by looking at the DAG it does not seem
to checkpoint
>>>>>>>> the state and when restarting the application from checkpoint,
it
>>>>>>>> re-partitions all the previous batches data from kafka.
>>>>>>>>
>>>>>>>> static Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>>> Tuple2<String, MyClass>> mappingFunc =
>>>>>>>> new Function3<String, Optional<MyClass>, State<MyClass>,
>>>>>>>> Tuple2<String, MyClass>>() {
>>>>>>>> @Override
>>>>>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass>
>>>>>>>> one, State<MyClass> state) {
>>>>>>>> MyClass nullObj = new MyClass();
>>>>>>>> nullObj.setImprLog(null);
>>>>>>>> nullObj.setNotifyLog(null);
>>>>>>>> MyClass current = one.or(nullObj);
>>>>>>>>
>>>>>>>> if(current!= null && current.getImprLog() != null
&&
>>>>>>>> current.getMyClassType() == 1){
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>> else if (current.getNotifyLog() != null  &&
>>>>>>>> current.getMyClassType() == 3){
>>>>>>>> MyClass oldState = (state.exists() ? state.get() : nullObj);
>>>>>>>> if(oldState!= null && oldState.getNotifyLog() !=
null){
>>>>>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>>>>>> return new Tuple2<>(key, oldState);
>>>>>>>> }
>>>>>>>> else{
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> else{
>>>>>>>> return new Tuple2<>(key, null);
>>>>>>>> }
>>>>>>>>
>>>>>>>> }
>>>>>>>> };
>>>>>>>>
>>>>>>>>
>>>>>>>> Please suggest if this is the proper way or am I doing something
>>>>>>>> wrong.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks !!
>>>>>>>> Abhi
>>>>>>>>
>>>>>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <
>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> If you don't want to update your only option will be
>>>>>>>>> updateStateByKey then
>>>>>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhihong@gmail.com>
wrote:
>>>>>>>>>
>>>>>>>>>> mapWithState supports checkpoint.
>>>>>>>>>>
>>>>>>>>>> There has been some bug fix since release of 1.6.0
>>>>>>>>>> e.g.
>>>>>>>>>>   SPARK-12591 NullPointerException using checkpointed
>>>>>>>>>> mapWithState with KryoSerializer
>>>>>>>>>>
>>>>>>>>>> which is in the upcoming 1.6.1
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand
<
>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does mapWithState checkpoints the data ?
>>>>>>>>>>>
>>>>>>>>>>> When my application goes down and is restarted
from checkpoint,
>>>>>>>>>>> will mapWithState need to recompute the previous
batches data ?
>>>>>>>>>>>
>>>>>>>>>>> Also, to use mapWithState I will need to upgrade
my application
>>>>>>>>>>> as I am using version 1.4.0 and mapWithState
isnt supported there. Is there
>>>>>>>>>>> any other work around ?
>>>>>>>>>>>
>>>>>>>>>>> Cheers!!
>>>>>>>>>>> Abhi
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu
<
>>>>>>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Looks like mapWithState could help you?
>>>>>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand"
<
>>>>>>>>>>>> abhis.anan007@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have an use case like follows in my
production environment
>>>>>>>>>>>>> where I am listening from kafka with
slideInterval of 1 min and
>>>>>>>>>>>>> windowLength of 2 hours.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a JavaPairDStream where for each
key I am getting the
>>>>>>>>>>>>> same key but with different value,which
might appear in the same batch or
>>>>>>>>>>>>> some next batch.
>>>>>>>>>>>>>
>>>>>>>>>>>>> When the key appears second time I need
to update a field in
>>>>>>>>>>>>> value of previous key with a field in
the later key. The keys for which the
>>>>>>>>>>>>> combination keys do not come should be
rejected after 2 hours.
>>>>>>>>>>>>>
>>>>>>>>>>>>> At the end of each second I need to output
the result to
>>>>>>>>>>>>> external database.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example :
>>>>>>>>>>>>>
>>>>>>>>>>>>> Suppose valueX is object of MyClass with
fields int a, String b
>>>>>>>>>>>>> At t=1sec I am getting
>>>>>>>>>>>>> key0,value0(0,"prev0")
>>>>>>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>>>>>>> key2,value3 (3, "next2")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 1 sec
>>>>>>>>>>>>> key2, newValue (2,"next2")
>>>>>>>>>>>>>
>>>>>>>>>>>>> At t=2 sec getting
>>>>>>>>>>>>> key3,value4(4,"prev3")
>>>>>>>>>>>>> key1,value5(5,"next1")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 2 sec
>>>>>>>>>>>>> key1,newValue(1,"next1")
>>>>>>>>>>>>>
>>>>>>>>>>>>> At t=3 sec
>>>>>>>>>>>>> key4,value6(6,"prev4")
>>>>>>>>>>>>> key3,value7(7,"next3")
>>>>>>>>>>>>> key5,value5(8,"prev5")
>>>>>>>>>>>>> key5,value5(9,"next5")
>>>>>>>>>>>>> key0,value0(10,"next0")
>>>>>>>>>>>>>
>>>>>>>>>>>>> Output to database after 3 sec
>>>>>>>>>>>>> key0,newValue(0,"next0")
>>>>>>>>>>>>> key3,newValue(4,"next3")
>>>>>>>>>>>>> key5,newValue(8,"next5")
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Please suggest how this can be achieved.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks a lot !!!!
>>>>>>>>>>>>> Abhi
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message