storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "J.R. Pauley" <jrpau...@gmail.com>
Subject Re: stream .each confusion
Date Thu, 03 Aug 2017 15:54:52 GMT
Bobby,

sorry if I am misreading you, but if I don't explicitly declare "args" then
Fn2 execute has only the single parameter "B" tuple.getString(0) as its
input. I am not able to call tuple.getString(1) and tuple.size() =1, not 2.
So below can't be right. How do I access "args" in Fn2?

tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new Fn1(), new Fields("B"))
        .each(new Fields("B"), new Fn2(), new Fields("C"))
        .each(new Fields("C"), new Fn3(), new Fields("D"));

On Thu, Aug 3, 2017 at 11:36 AM, Bobby Evans <evans@yahoo-inc.com> wrote:

> With trident the original args going into the each never go away, so you
> don't have to output them.
>
>      tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new Fn1(), new Fields("B"))
>         .each(new Fields("args","B"), new Fn2(), new Fields("C"))
>         .each(new Fields("args","C"), new Fn3(), new Fields("D"));
>
> By removing the "args" being output from Fn2() it should work for you.  I
> know it is a bit odd but that is how it works.
>
> - Bobby
>
>
>
> On Thursday, August 3, 2017, 10:32:17 AM CDT, J.R. Pauley <
> jrpauley@gmail.com> wrote:
>
>
> I'm scratching my head trying to produce a simple drpc stream that does
> what I want. Logically I think I want this:
>
>      tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new Fn1(), new Fields("B"))
>         .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))
>         .each(new Fields("args","C"), new Fn3(), new Fields("D"));
>
> However I get a runtime exception for duplicated args, so I renamed the
> args to give them all unique names, like so:
>
>       tridentTopology
>         .newDRPCStream("crmc", null)
>         .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))
>         .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))
>         .each(new Fields("args3","C"), new Fn3(), new Fields("D"));
>
> This works (sort of). The problem is I end up with output stream of lots
> of duplicate args. I don't want the original "args" emitted in the output
> stream at all. But if I emit D I still see args2,args3,D in the output.
>
> All I am trying to do is
> 1)make args available to all 3 named functions, and
> 2)supply additional arg B as input to Fn2, and
> 3)supply additional arg C as input to Fn3 which outputs D and D only as
> result
>
> I've read that I don't need to define new "args" as they are passed to all
> functions. However if I try to access tuple.getString(1) in Fn2 I get an
> ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.
>
> So I'm really confused as to how best to define this topology. Any help
> appreciated.
>
>

Mime
View raw message