storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Sharma <ping2r...@gmail.com>
Subject Re: Sending Some Context when Failing a tuple.
Date Tue, 05 Jan 2016 08:23:39 GMT
Hi Ankur,

Thanks for detailed reply, I havent tried the code yet on cluster but just
running it on my mind.

Lets say Bean b's single instance was created for a message and passed as
part of tuple as well as messageId

I dont know the implementaion but i will assume messageId and tuple will be
serialized seprately and deserialized seprately in another node. So while
serializing same object will be serialized twice, but when deserializing
two new objects will be created, with exact same state/content.

so basically

(Bean)tuple.getValue(1) != messageIdObject, comparing only the memory
address after deserlizing.


So when i change Bean b in tuple , it will not be reflected in Messageid.

It may work in dev mode as there is no serialization/deserilization , but
in cluster mode it may not work, but i will try to run it in cluster mode
and see that it actually does what i have just said.


Also messageId and Tuple are suppose to be read only objects, which means
even if it works now it may not work in future version, if some kind of
caching comes in picture, i.e. spout keeps the cache, on ack bolt sends
only a key of tuple and spout get it from cache etc etc


Thanks

Ravi.






On 4 Jan 2016 5:14 pm, "Ankur Garg" <ankurgarg9@gmail.com> wrote:

> My Bad Ravi that I could not explain my point properly .
>
> Like you said in the fail method  when u call outputCollector.fail(tuple)
> , ISpout.fail(Object messageId) gets invoked.
>
> May be u r passing String as this messageId but the argument itself says
> Object .
>
> So what I meant to say that you can pass your same object as MessageId
> (perhaps name it better for understanding) .
>
> To Elaborate , Lets say I pass along simple Java Bean from Spout to Bolt .
>
> Here is my sample Bean
>
> public class Bean implements Serializable {
>
> private String a; // your custom attributes which are serializable .
>
> private int b; //// your custom attributes which are serializable .
>
> private int c; // your custom attributes which are serializable .
>
> private String msgId; //your custom attributes which are serializable .
>
>    private String failureReason;   // Failure Reason ..to be populated
> inside bolt when tuple fails
>
>
> //getter setter Methods
>
> }
>
> In your Spout inside nextTuple . Taking example from word count of Storm
> Starter project
>
>
> public void nextTuple() {
>
> try{
>
> final String[] words = new String[] { "nathan", "mike", "jackson",
>
> "golda", "bertels" };
>
> final Random rand = new Random();
>
> final String word = words[rand.nextInt(words.length)];
>
> String msgId = word +  UUID.randomUUID().toString();
>
> Bean b = new Bean();
>
> b.setA("String A");
>
> b.setB(123);
>
> b.setC(456);
>
> b.setMsgId(msgId); // not necessary to do
>
> * _collector.emit(new Values(word,b) , b);*
>
> LOG.info("Exit nextTuple Method ");
>
> }
>
>     catch(Exception ex)
>
>     {
>
>       ex.printStackTrace();
>
>     }
>
> LOG.info("Final Exit nextTuple method ");
>
> }
>
>
> See the _collector.emit . I am passing the same bean object as MessageId
> Object .
>
> *And declareOutputFields method as *
>
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>
> declarer.declare(new Fields("word" , "correlationObject"));
>
> }
>
>
> Now in my Bolt
>
> In the execute Method
>
> @Override
>
> public void execute(Tuple tuple) {
>
>   Bean b1 = (Bean)tuple.getValue(1);
>
>  * b1.setFailureReason("It failed ");*
>
> _collector.fail(tuple);
>
> }
>
>
> Now , when _collector.fail method is called Spout's fail method gets
> invoked
>
> public void fail(Object msgId) {
>
>  Bean b1 = (Bean) msgId;
>
>  String failureReason = b1.getFailureReason();
>
> }
>
>
> *You will see the failureReason u set inside ur bolt received here inside
> fail method . *
>
> *Again , I am not saying that this is the best way to achieve what u want
> , but just proposing a way it can be done.*
>
> Hope this helps.
>
>
> Thanks
>
> Ankur
>
>
> On Mon, Jan 4, 2016 at 2:24 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>
>> Hi Ankur,
>> Various Storm API for this are like this
>>
>> Bolts recieve Tuple, which is immutable object.
>> Once something fails in Bolt we call outputCollector.fail(tuple)
>>
>> which in turn invoke Spout's ISpout.fail(Object messageId) method.
>>
>>
>>
>> now spout gets only the messageId back(which i created when processing
>> started from that spout), so not sure how some other info either some
>> object or boolean or integer or String can be passed from Bolts to Spout
>> after failing.
>>
>> Thanks
>> Ravi
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Jan 3, 2016 at 7:00 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
>>
>>> Hi Ravi,
>>>
>>> May be a very naive answer here but still posting it  :
>>>
>>> I am assuming that once u catch the failed Tuple inside the fail method
>>> of your spout  all you need to decide is whether this tuple should be
>>> replayed or not .
>>>
>>> I am assuming the object you are sending between spout and bolts are
>>> already serializable .  How about adding this extra information fore
>>> replaying the tuple to the same Object  (Since you already thinking of
>>> storing it in some external storage I am assuming this info too is
>>> serializable) . It may be a simple boolean flag too .
>>>
>>> For ex :
>>>
>>> Original Tuple u  r sending may be
>>>
>>> OrigTuple implements Serializable
>>> {
>>>     ObjectA a;
>>>     ObjectB b;
>>> }
>>>
>>> I am assuming a , b are all serializable or marked transient .
>>>
>>> Now in case of failure you can attach Object C too which contains
>>> failure information or simple boolean Flag which implies to the spout that
>>> it needs to be played . For the ones which dont need to be played it takes
>>> default value as false .
>>>
>>>
>>> Like I said before , it is a very simple thought  but I could think of
>>> this may work based on info u provided and assumptions I made.
>>>
>>> Thanks
>>> Ankur
>>>
>>>
>>>
>>> On Sun, Jan 3, 2016 at 3:59 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>>>
>>>> Hi All,
>>>> I would like to send some extra information back to spout when a tuple
>>>> is failed in some Bolt, so that Spout can decide if it want it to replay
or
>>>> just put the message into queue outside storm for admins to view.
>>>>
>>>> So is there any way i can attach some more information when sending
>>>> back failed tuple to spout.?
>>>>
>>>> One way i can think of is keeping such information outside storm in
>>>> some datastore, with Tuple id and spout can lookup that, but looking for
>>>> some way to do it via storm without bringing in other integration/datastore.
>>>>
>>>>
>>>> Thanks
>>>> Ravi.
>>>>
>>>
>>>
>>
>

Mime
View raw message