It seems you wanted to send tuples to a bolt's task from which those tuples were received in the current bolt.

Initial bolt can send the current task-id(org.apache.storm.task.TopologyContext#getThisTaskId()) as part of tuple fields and this can be used by subsequent bolt to emit tuples directly to the earlier bolt's task using `OutputCollector#emiteDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) `

Hope it helps,

On Fri, Jun 10, 2016 at 7:47 PM, Jason Kania <jason.kania@ymail.com> wrote:
Thanks for the response and the code reference.

To explain the use case, we would like to be able to use shuffle grouping for initial load balancing but then use fields grouping and some generated token to be able to get back to the same bolt after passing messages to a different bolt doing other processing. The reason to get back to the same box is because of a very large media file that needs to be pulled and processed on that box causing us to want to be sticky to that box.

We have tried using fieldsGrouping on its own but we end up with asymmetries in the load across the boxes simply because we only have 5 boxes doing this specific processing and hashing does not allow it to be evenly balanced.

Right now, we are generating random token values and then sending them out to be collected by the different bolts. The bolts then use the tokens that have arrived as the fieldsGrouping key for subsequent operations. The ideal would be if we could directly get a token that would allow return to the same bolt instance to complete the job.

I am wondering if something like this would be worthy of an enhancement request?


From: Matthias J. Sax <mjsax@apache.org>
To: user@storm.apache.org
Sent: Wednesday, June 8, 2016 1:46 PM
Subject: Re: Access to storm fieldsGrouping hashing method in bolt/spout

I cannot completely follow you use case and what you want to accomplish...

However, even as there is no official API to call the hash function, it
is actually pretty simple. Storm internally creates a List of the
fieldsGrouping attributes and call .hashCode() on it (and of course
applies a modulo afterwards).

I actually wrote a wrapper to call the internal hash function once (it
works for 0.9.3 -- not sure if it is compatible with newer versions).
You can find the code here:

The project assembles a Java class with a static method to expose a
simple to use Java API that call internal Storm core stuff to get the
receiver task ID.

> StormConnector.getFieldsGroupingReceiverTaskId(
    TopologyContext ctx,
    String producer-component-id,
    String output-stream-id,
    String receiver-component-id,
    List<Object> tuple);


On 06/08/2016 07:02 PM, Jason Kania wrote:
> Hello,
> I am wondering if there is a means to access the hashing method/function
> that storm applies for the fieldsGrouping method. I would like to
> generate a token that will hash back to the current node so that
> subsequent processing can come back to the same node . I realize that
> generating an applicable token would be trial and an error, but I would
> like to take advantage of shufflegrouping to assign tasks but then use
> fieldsGrouping to ensure that the rest of the work for the task comes
> back to the same node.
> Thanks,
> Jason