storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Sending Some Context when Failing a tuple.
Date Mon, 04 Jan 2016 17:14:01 GMT
My Bad Ravi that I could not explain my point properly .

Like you said in the fail method  when u call outputCollector.fail(tuple) ,
ISpout.fail(Object messageId) gets invoked.

May be u r passing String as this messageId but the argument itself says
Object .

So what I meant to say that you can pass your same object as MessageId
(perhaps name it better for understanding) .

To Elaborate , Lets say I pass along simple Java Bean from Spout to Bolt .

Here is my sample Bean

public class Bean implements Serializable {

private String a; // your custom attributes which are serializable .

private int b; //// your custom attributes which are serializable .

private int c; // your custom attributes which are serializable .

private String msgId; //your custom attributes which are serializable .

   private String failureReason;   // Failure Reason ..to be populated
inside bolt when tuple fails


//getter setter Methods

}

In your Spout inside nextTuple . Taking example from word count of Storm
Starter project


public void nextTuple() {

try{

final String[] words = new String[] { "nathan", "mike", "jackson",

"golda", "bertels" };

final Random rand = new Random();

final String word = words[rand.nextInt(words.length)];

String msgId = word +  UUID.randomUUID().toString();

Bean b = new Bean();

b.setA("String A");

b.setB(123);

b.setC(456);

b.setMsgId(msgId); // not necessary to do

* _collector.emit(new Values(word,b) , b);*

LOG.info("Exit nextTuple Method ");

}

    catch(Exception ex)

    {

      ex.printStackTrace();

    }

LOG.info("Final Exit nextTuple method ");

}


See the _collector.emit . I am passing the same bean object as MessageId
Object .

*And declareOutputFields method as *

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word" , "correlationObject"));

}


Now in my Bolt

In the execute Method

@Override

public void execute(Tuple tuple) {

  Bean b1 = (Bean)tuple.getValue(1);

 * b1.setFailureReason("It failed ");*

_collector.fail(tuple);

}


Now , when _collector.fail method is called Spout's fail method gets
invoked

public void fail(Object msgId) {

 Bean b1 = (Bean) msgId;

 String failureReason = b1.getFailureReason();

}


*You will see the failureReason u set inside ur bolt received here inside
fail method . *

*Again , I am not saying that this is the best way to achieve what u want ,
but just proposing a way it can be done.*

Hope this helps.


Thanks

Ankur


On Mon, Jan 4, 2016 at 2:24 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:

> Hi Ankur,
> Various Storm API for this are like this
>
> Bolts recieve Tuple, which is immutable object.
> Once something fails in Bolt we call outputCollector.fail(tuple)
>
> which in turn invoke Spout's ISpout.fail(Object messageId) method.
>
>
>
> now spout gets only the messageId back(which i created when processing
> started from that spout), so not sure how some other info either some
> object or boolean or integer or String can be passed from Bolts to Spout
> after failing.
>
> Thanks
> Ravi
>
>
>
>
>
>
>
>
>
> On Sun, Jan 3, 2016 at 7:00 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
>
>> Hi Ravi,
>>
>> May be a very naive answer here but still posting it  :
>>
>> I am assuming that once u catch the failed Tuple inside the fail method
>> of your spout  all you need to decide is whether this tuple should be
>> replayed or not .
>>
>> I am assuming the object you are sending between spout and bolts are
>> already serializable .  How about adding this extra information fore
>> replaying the tuple to the same Object  (Since you already thinking of
>> storing it in some external storage I am assuming this info too is
>> serializable) . It may be a simple boolean flag too .
>>
>> For ex :
>>
>> Original Tuple u  r sending may be
>>
>> OrigTuple implements Serializable
>> {
>>     ObjectA a;
>>     ObjectB b;
>> }
>>
>> I am assuming a , b are all serializable or marked transient .
>>
>> Now in case of failure you can attach Object C too which contains failure
>> information or simple boolean Flag which implies to the spout that it needs
>> to be played . For the ones which dont need to be played it takes default
>> value as false .
>>
>>
>> Like I said before , it is a very simple thought  but I could think of
>> this may work based on info u provided and assumptions I made.
>>
>> Thanks
>> Ankur
>>
>>
>>
>> On Sun, Jan 3, 2016 at 3:59 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>>
>>> Hi All,
>>> I would like to send some extra information back to spout when a tuple
>>> is failed in some Bolt, so that Spout can decide if it want it to replay or
>>> just put the message into queue outside storm for admins to view.
>>>
>>> So is there any way i can attach some more information when sending back
>>> failed tuple to spout.?
>>>
>>> One way i can think of is keeping such information outside storm in some
>>> datastore, with Tuple id and spout can lookup that, but looking for some
>>> way to do it via storm without bringing in other integration/datastore.
>>>
>>>
>>> Thanks
>>> Ravi.
>>>
>>
>>
>

Mime
View raw message