storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Satish Duggana <>
Subject Re: Access to storm fieldsGrouping hashing method in bolt/spout
Date Sat, 11 Jun 2016 17:10:42 GMT

It seems you wanted to send tuples to a bolt's task from which those tuples
were received in the current bolt.

Initial bolt can send the current
task-id(org.apache.storm.task.TopologyContext#getThisTaskId()) as part of
tuple fields and this can be used by subsequent bolt to emit tuples
directly to the earlier bolt's task using `OutputCollector#emiteDirect(int
taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) `

Hope it helps,

On Fri, Jun 10, 2016 at 7:47 PM, Jason Kania <> wrote:

> Thanks for the response and the code reference.
> To explain the use case, we would like to be able to use shuffle grouping
> for initial load balancing but then use fields grouping and some generated
> token to be able to get back to the same bolt after passing messages to a
> different bolt doing other processing. The reason to get back to the same
> box is because of a very large media file that needs to be pulled and
> processed on that box causing us to want to be sticky to that box.
> We have tried using fieldsGrouping on its own but we end up with
> asymmetries in the load across the boxes simply because we only have 5
> boxes doing this specific processing and hashing does not allow it to be
> evenly balanced.
> Right now, we are generating random token values and then sending them out
> to be collected by the different bolts. The bolts then use the tokens that
> have arrived as the fieldsGrouping key for subsequent operations. The ideal
> would be if we could directly get a token that would allow return to the
> same bolt instance to complete the job.
> I am wondering if something like this would be worthy of an enhancement
> request?
> Jason
> ------------------------------
> *From:* Matthias J. Sax <>
> *To:*
> *Sent:* Wednesday, June 8, 2016 1:46 PM
> *Subject:* Re: Access to storm fieldsGrouping hashing method in bolt/spout
> I cannot completely follow you use case and what you want to accomplish...
> However, even as there is no official API to call the hash function, it
> is actually pretty simple. Storm internally creates a List of the
> fieldsGrouping attributes and call .hashCode() on it (and of course
> applies a modulo afterwards).
> I actually wrote a wrapper to call the internal hash function once (it
> works for 0.9.3 -- not sure if it is compatible with newer versions).
> You can find the code here:
> The project assembles a Java class with a static method to expose a
> simple to use Java API that call internal Storm core stuff to get the
> receiver task ID.
> > StormConnector.getFieldsGroupingReceiverTaskId(
>     TopologyContext ctx,
>     String producer-component-id,
>     String output-stream-id,
>     String receiver-component-id,
>     List<Object> tuple);
> -Matthias
> On 06/08/2016 07:02 PM, Jason Kania wrote:
> > Hello,
> >
> > I am wondering if there is a means to access the hashing method/function
> > that storm applies for the fieldsGrouping method. I would like to
> > generate a token that will hash back to the current node so that
> > subsequent processing can come back to the same node . I realize that
> > generating an applicable token would be trial and an error, but I would
> > like to take advantage of shufflegrouping to assign tasks but then use
> > fieldsGrouping to ensure that the rest of the work for the task comes
> > back to the same node.
> >
> > Thanks,
> >
> > Jason

View raw message