storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <navin....@searchlighthealth.com>
Subject Dynamically linking Spouts to Bolts
Date Thu, 09 Jun 2016 06:48:56 GMT
Hi,

I've already changed the design of my code to solve the problem, but just
wanted to ask if the problem could have been solved alternatively.

*The problem:*
Wanted to create x number of spouts that would each handle a batch of
records from a database. First spout handles batch 1 to 1000. Second spout
handles 1001 to 2000 etc.
The problem is if x spouts are created dynamically, then how do we assign
the output of all the spouts to the bolts?

*Solution that didn't work:*
A colleague suggested

String s = "sp";
*for(*Integer ordinal = 1; ordinal <= *numSpouts*; ++ordinal) *{*
    String spoutName = s + ordinal.toString();
    builder.setSpout(spoutName, new mdSpout(), 1)
        .setNumTasks(1);
*}*

*BoltDeclarer boltDec = builder.setBolt(cgBolt, new CBolt(), thr);*
*for(*int i=0; i < *numSpouts*; i++) *{*
    *boltDec.*fieldsGrouping(s+Integer.toString(i), new
Fields(configRef.BID))
            .allGrouping(s+Integer.toString(i), configRef.ID);
*}*

*But this gave an invalid topology exception.*


*Solution I eventually had to use:*
*for(*Integer ordinal = 1; ordinal <= numSpouts; ++ordinal) *{*
    String spoutName = s + ordinal.toString();
    builder.setSpout(spoutName, new mdSpout(), 1)
            .setNumTasks(1);
*}*

*switch(numSpouts) {*
    case 1:
        builder.setBolt(cgBolt, new CBolt(), thr)
                //.setNumTasks(3)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", configRef.ID);
        break;
    case 2:
        builder.setBolt(cgBolt, new CBolt(), thr)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", ID)
                .fieldsGrouping(s+"2", new Fields(configRef.BID))
                .allGrouping(s+"2", configRef.ID);
        break;
    case 3:
        builder.setBolt(cgBolt, new CBolt(), thr)
                .fieldsGrouping(s+"1", new Fields(configRef.BID))
                .allGrouping(s+"1", configRef.ID)
                .fieldsGrouping(s+"2", new Fields(configRef.BID))
                .allGrouping(s+"2", configRef.ID)
                .fieldsGrouping(s+"3", new Fields(configRef.BID))
                .allGrouping(s+"3", configRef.ID);
        break;
*}*

This worked, but obviously there should be a better way to do this, right?

ps: Eventually I used a different method of sending the batches to the
spouts so that the spouts didn't have to be created with a for loop, but am
still curious about how to solve the above problem.

-- 
Regards,
Navin

Mime
View raw message