Hi Ravi ,

I have tested in local Mode only so I am not sure  if (Bean)tuple.getValue(1) != messageIdObject holds or not in remote cluster . Unfortunately , I no longer have a cluster setup to try it out :(  .  Although my guess is it should not be a problem serialising the same object twice in this case. Please let me know your finding if you get a chance to test this on cluster  .

Having said that , If in case the above doesn't work  I believe you can use a different object to store ur failure message and not necessarily the same bean as I quoted in my example as Storm internally will maintain the mapping between msgId and failed Tuple . 

So lets say earlier you were storing this messageId as  some string or GUID (say "xyz") 

Your messageId was String msgId = "xyz" . 

Now u may replace it with a POJO with whatever attributes you want to set for failure. 

To elaborate ur MessageId Object can be 

public class  Message
{
    String Id;
    String failureReason;
  //  or simply 
    boolean tobeReplayed ; //true if you want to replay again , to be populated inside bolts
}

And then based on failureReason itself you can replay the tuple .

The reason for me suggesting the same bean was because of the reason that you don't have to maintain the info of the tuple u passed to bolts anywhere .  But assuming you might already be doing that , the above messageId object should get the job done for u .

For your second point , I will assume that future releases should be backward compatible :P . On a serious note , there may be a better way to solve this but based on my current understanding of Storm , I am not sure if Storm provides anything out of the box for this.


Thanks
Ankur








On Tue, Jan 5, 2016 at 1:53 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:

Hi Ankur,

Thanks for detailed reply, I havent tried the code yet on cluster but just running it on my mind.

Lets say Bean b's single instance was created for a message and passed as part of tuple as well as messageId

I dont know the implementaion but i will assume messageId and tuple will be serialized seprately and deserialized seprately in another node. So while serializing same object will be serialized twice, but when deserializing two new objects will be created, with exact same state/content.

so basically 

(Bean)tuple.getValue(1) != messageIdObject, comparing only the memory address after deserlizing.


So when i change Bean b in tuple , it will not be reflected in Messageid.

It may work in dev mode as there is no serialization/deserilization , but in cluster mode it may not work, but i will try to run it in cluster mode and see that it actually does what i have just said.


Also messageId and Tuple are suppose to be read only objects, which means even if it works now it may not work in future version, if some kind of caching comes in picture, i.e. spout keeps the cache, on ack bolt sends only a key of tuple and spout get it from cache etc etc


Thanks

Ravi.






On 4 Jan 2016 5:14 pm, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
My Bad Ravi that I could not explain my point properly . 

Like you said in the fail method  when u call outputCollector.fail(tuple) , ISpout.fail(Object messageId) gets invoked.

May be u r passing String as this messageId but the argument itself says Object . 

So what I meant to say that you can pass your same object as MessageId (perhaps name it better for understanding) .

To Elaborate , Lets say I pass along simple Java Bean from Spout to Bolt .

Here is my sample Bean

public class Bean implements Serializable {

private String a; // your custom attributes which are serializable .

private int b; //// your custom attributes which are serializable .

private int c; // your custom attributes which are serializable .

private String msgId; //your custom attributes which are serializable .

   private String failureReason;   // Failure Reason ..to be populated inside bolt when tuple fails


//getter setter Methods

}

In your Spout inside nextTuple . Taking example from word count of Storm Starter project


public void nextTuple() {

try{

final String[] words = new String[] { "nathan", "mike", "jackson",

"golda", "bertels" };

final Random rand = new Random();

final String word = words[rand.nextInt(words.length)];

String msgId = word +  UUID.randomUUID().toString();

Bean b = new Bean();

b.setA("String A");

b.setB(123);

b.setC(456);

b.setMsgId(msgId); // not necessary to do

_collector.emit(new Values(word,b) , b);

LOG.info("Exit nextTuple Method ");

}

    catch(Exception ex)

    {

      ex.printStackTrace();

    }

LOG.info("Final Exit nextTuple method ");

}


See the _collector.emit . I am passing the same bean object as MessageId Object .

And declareOutputFields method as 

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word" , "correlationObject"));

}


Now in my Bolt

In the execute Method

@Override

public void execute(Tuple tuple) {

  Bean b1 = (Bean)tuple.getValue(1);

  b1.setFailureReason("It failed ");

_collector.fail(tuple);

}


Now , when _collector.fail method is called Spout's fail method gets invoked 

public void fail(Object msgId) {

 Bean b1 = (Bean) msgId

 String failureReason = b1.getFailureReason();

}


You will see the failureReason u set inside ur bolt received here inside fail method . 

Again , I am not saying that this is the best way to achieve what u want , but just proposing a way it can be done.

Hope this helps.


Thanks

Ankur


On Mon, Jan 4, 2016 at 2:24 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
Hi Ankur,
Various Storm API for this are like this

Bolts recieve Tuple, which is immutable object.
Once something fails in Bolt we call outputCollector.fail(tuple)

which in turn invoke Spout's ISpout.fail(Object messageId) method.



now spout gets only the messageId back(which i created when processing started from that spout), so not sure how some other info either some object or boolean or integer or String can be passed from Bolts to Spout after failing.

Thanks
Ravi









On Sun, Jan 3, 2016 at 7:00 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
Hi Ravi,

May be a very naive answer here but still posting it  :

I am assuming that once u catch the failed Tuple inside the fail method of your spout  all you need to decide is whether this tuple should be replayed or not .

I am assuming the object you are sending between spout and bolts are already serializable .  How about adding this extra information fore replaying the tuple to the same Object  (Since you already thinking of storing it in some external storage I am assuming this info too is serializable) . It may be a simple boolean flag too .

For ex :

Original Tuple u  r sending may be

OrigTuple implements Serializable
{
    ObjectA a;
    ObjectB b;
}

I am assuming a , b are all serializable or marked transient .

Now in case of failure you can attach Object C too which contains failure information or simple boolean Flag which implies to the spout that it needs to be played . For the ones which dont need to be played it takes default value as false . 


Like I said before , it is a very simple thought  but I could think of this may work based on info u provided and assumptions I made.

Thanks
Ankur



On Sun, Jan 3, 2016 at 3:59 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
Hi All,
I would like to send some extra information back to spout when a tuple is failed in some Bolt, so that Spout can decide if it want it to replay or just put the message into queue outside storm for admins to view.

So is there any way i can attach some more information when sending back failed tuple to spout.?

One way i can think of is keeping such information outside storm in some datastore, with Tuple id and spout can lookup that, but looking for some way to do it via storm without bringing in other integration/datastore.


Thanks
Ravi.