storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "J.R. Pauley" <jrpau...@gmail.com>
Subject stream .each confusion
Date Thu, 03 Aug 2017 15:32:10 GMT
I'm scratching my head trying to produce a simple drpc stream that does
what I want. Logically I think I want this:

     tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new Fn1(), new Fields("B"))
        .each(new Fields("args","B"), new Fn2(), new Fields("args", "C"))
        .each(new Fields("args","C"), new Fn3(), new Fields("D"));

However I get a runtime exception for duplicated args, so I renamed the
args to give them all unique names, like so:

      tridentTopology
        .newDRPCStream("crmc", null)
        .each(new Fields("args"), new NormalizeFn(), new Fields("dpc"))
        .each(new Fields("args2","B"), new Fn2(), new Fields("args3", "C"))
        .each(new Fields("args3","C"), new Fn3(), new Fields("D"));

This works (sort of). The problem is I end up with output stream of lots of
duplicate args. I don't want the original "args" emitted in the output
stream at all. But if I emit D I still see args2,args3,D in the output.

All I am trying to do is
1)make args available to all 3 named functions, and
2)supply additional arg B as input to Fn2, and
3)supply additional arg C as input to Fn3 which outputs D and D only as
result

I've read that I don't need to define new "args" as they are passed to all
functions. However if I try to access tuple.getString(1) in Fn2 I get an
ArrayOutOfBounds unless I am explicityly passing in 2 named parameters.

So I'm really confused as to how best to define this topology. Any help
appreciated.

Mime
View raw message