storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Sending Some Context when Failing a tuple.
Date Tue, 05 Jan 2016 18:35:35 GMT
Hi Ravi ,

I have tested in local Mode only so I am not sure  if (Bean)tuple.getValue(1)
!= messageIdObject holds or not in remote cluster . Unfortunately , I no
longer have a cluster setup to try it out :(  .  Although my guess is it
should not be a problem serialising the same object twice in this case.
Please let me know your finding if you get a chance to test this on cluster
 .

Having said that , If in case the above doesn't work  I believe you can use
a different object to store ur failure message and not necessarily the same
bean as I quoted in my example as Storm internally will maintain the
mapping between msgId and failed Tuple .

So lets say earlier you were storing this messageId as  some string or GUID
(say "xyz")

Your messageId was String msgId = "xyz" .

Now u may replace it with a POJO with whatever attributes you want to set
for failure.

To elaborate ur MessageId Object can be

public class  Message
{
    String Id;
    String failureReason;
  //  or simply
    boolean tobeReplayed ; //true if you want to replay again , to be
populated inside bolts
}

And then based on failureReason itself you can replay the tuple .

The reason for me suggesting the same bean was because of the reason that
you don't have to maintain the info of the tuple u passed to bolts anywhere
.  But assuming you might already be doing that , the above messageId
object should get the job done for u .

For your second point , I will assume that future releases should be
backward compatible :P . On a serious note , there may be a better way to
solve this but based on my current understanding of Storm , I am not sure
if Storm provides anything out of the box for this.


Thanks
Ankur








On Tue, Jan 5, 2016 at 1:53 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:

> Hi Ankur,
>
> Thanks for detailed reply, I havent tried the code yet on cluster but just
> running it on my mind.
>
> Lets say Bean b's single instance was created for a message and passed as
> part of tuple as well as messageId
>
> I dont know the implementaion but i will assume messageId and tuple will
> be serialized seprately and deserialized seprately in another node. So
> while serializing same object will be serialized twice, but when
> deserializing two new objects will be created, with exact same
> state/content.
>
> so basically
>
> (Bean)tuple.getValue(1) != messageIdObject, comparing only the memory
> address after deserlizing.
>
>
> So when i change Bean b in tuple , it will not be reflected in Messageid.
>
> It may work in dev mode as there is no serialization/deserilization , but
> in cluster mode it may not work, but i will try to run it in cluster mode
> and see that it actually does what i have just said.
>
>
> Also messageId and Tuple are suppose to be read only objects, which means
> even if it works now it may not work in future version, if some kind of
> caching comes in picture, i.e. spout keeps the cache, on ack bolt sends
> only a key of tuple and spout get it from cache etc etc
>
>
> Thanks
>
> Ravi.
>
>
>
>
>
>
> On 4 Jan 2016 5:14 pm, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>
>> My Bad Ravi that I could not explain my point properly .
>>
>> Like you said in the fail method  when u call outputCollector.fail(tuple)
>> , ISpout.fail(Object messageId) gets invoked.
>>
>> May be u r passing String as this messageId but the argument itself says
>> Object .
>>
>> So what I meant to say that you can pass your same object as MessageId
>> (perhaps name it better for understanding) .
>>
>> To Elaborate , Lets say I pass along simple Java Bean from Spout to Bolt .
>>
>> Here is my sample Bean
>>
>> public class Bean implements Serializable {
>>
>> private String a; // your custom attributes which are serializable .
>>
>> private int b; //// your custom attributes which are serializable .
>>
>> private int c; // your custom attributes which are serializable .
>>
>> private String msgId; //your custom attributes which are serializable .
>>
>>    private String failureReason;   // Failure Reason ..to be populated
>> inside bolt when tuple fails
>>
>>
>> //getter setter Methods
>>
>> }
>>
>> In your Spout inside nextTuple . Taking example from word count of Storm
>> Starter project
>>
>>
>> public void nextTuple() {
>>
>> try{
>>
>> final String[] words = new String[] { "nathan", "mike", "jackson",
>>
>> "golda", "bertels" };
>>
>> final Random rand = new Random();
>>
>> final String word = words[rand.nextInt(words.length)];
>>
>> String msgId = word +  UUID.randomUUID().toString();
>>
>> Bean b = new Bean();
>>
>> b.setA("String A");
>>
>> b.setB(123);
>>
>> b.setC(456);
>>
>> b.setMsgId(msgId); // not necessary to do
>>
>> * _collector.emit(new Values(word,b) , b);*
>>
>> LOG.info("Exit nextTuple Method ");
>>
>> }
>>
>>     catch(Exception ex)
>>
>>     {
>>
>>       ex.printStackTrace();
>>
>>     }
>>
>> LOG.info("Final Exit nextTuple method ");
>>
>> }
>>
>>
>> See the _collector.emit . I am passing the same bean object as MessageId
>> Object .
>>
>> *And declareOutputFields method as *
>>
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>
>> declarer.declare(new Fields("word" , "correlationObject"));
>>
>> }
>>
>>
>> Now in my Bolt
>>
>> In the execute Method
>>
>> @Override
>>
>> public void execute(Tuple tuple) {
>>
>>   Bean b1 = (Bean)tuple.getValue(1);
>>
>>  * b1.setFailureReason("It failed ");*
>>
>> _collector.fail(tuple);
>>
>> }
>>
>>
>> Now , when _collector.fail method is called Spout's fail method gets
>> invoked
>>
>> public void fail(Object msgId) {
>>
>>  Bean b1 = (Bean) msgId;
>>
>>  String failureReason = b1.getFailureReason();
>>
>> }
>>
>>
>> *You will see the failureReason u set inside ur bolt received here inside
>> fail method . *
>>
>> *Again , I am not saying that this is the best way to achieve what u want
>> , but just proposing a way it can be done.*
>>
>> Hope this helps.
>>
>>
>> Thanks
>>
>> Ankur
>>
>>
>> On Mon, Jan 4, 2016 at 2:24 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>>
>>> Hi Ankur,
>>> Various Storm API for this are like this
>>>
>>> Bolts recieve Tuple, which is immutable object.
>>> Once something fails in Bolt we call outputCollector.fail(tuple)
>>>
>>> which in turn invoke Spout's ISpout.fail(Object messageId) method.
>>>
>>>
>>>
>>> now spout gets only the messageId back(which i created when processing
>>> started from that spout), so not sure how some other info either some
>>> object or boolean or integer or String can be passed from Bolts to Spout
>>> after failing.
>>>
>>> Thanks
>>> Ravi
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Jan 3, 2016 at 7:00 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> May be a very naive answer here but still posting it  :
>>>>
>>>> I am assuming that once u catch the failed Tuple inside the fail method
>>>> of your spout  all you need to decide is whether this tuple should be
>>>> replayed or not .
>>>>
>>>> I am assuming the object you are sending between spout and bolts are
>>>> already serializable .  How about adding this extra information fore
>>>> replaying the tuple to the same Object  (Since you already thinking of
>>>> storing it in some external storage I am assuming this info too is
>>>> serializable) . It may be a simple boolean flag too .
>>>>
>>>> For ex :
>>>>
>>>> Original Tuple u  r sending may be
>>>>
>>>> OrigTuple implements Serializable
>>>> {
>>>>     ObjectA a;
>>>>     ObjectB b;
>>>> }
>>>>
>>>> I am assuming a , b are all serializable or marked transient .
>>>>
>>>> Now in case of failure you can attach Object C too which contains
>>>> failure information or simple boolean Flag which implies to the spout that
>>>> it needs to be played . For the ones which dont need to be played it takes
>>>> default value as false .
>>>>
>>>>
>>>> Like I said before , it is a very simple thought  but I could think of
>>>> this may work based on info u provided and assumptions I made.
>>>>
>>>> Thanks
>>>> Ankur
>>>>
>>>>
>>>>
>>>> On Sun, Jan 3, 2016 at 3:59 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>> I would like to send some extra information back to spout when a tuple
>>>>> is failed in some Bolt, so that Spout can decide if it want it to replay
or
>>>>> just put the message into queue outside storm for admins to view.
>>>>>
>>>>> So is there any way i can attach some more information when sending
>>>>> back failed tuple to spout.?
>>>>>
>>>>> One way i can think of is keeping such information outside storm in
>>>>> some datastore, with Tuple id and spout can lookup that, but looking
for
>>>>> some way to do it via storm without bringing in other integration/datastore.
>>>>>
>>>>>
>>>>> Thanks
>>>>> Ravi.
>>>>>
>>>>
>>>>
>>>
>>

Mime
View raw message