storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <navin....@searchlighthealth.com>
Subject Re: How are multiple spouts and fields grouping planned out?
Date Mon, 25 Apr 2016 11:35:16 GMT
Aha! That's exactly what I needed to know. I feel the official
documentation should also explain the concept the way you explained it.
Without the explanation you gave, it is very difficult to understand
anything.
Thank you very very much Matthias! :-)

On Mon, Apr 25, 2016 at 4:46 PM, Matthias J. Sax <mjsax@apache.org> wrote:

> That is correct. The constructor is only called once to create a single
> instance that is handed to the TopologyBuilder. Internally, this
> instance is serialized and deserialized multiple times to create the
> executors. (Instantiating an object via deserialization does not call
> the constructor.)
>
> So Storm create all instances during deployment.
>
> If you look at the log output, you should see multiple statement like
>
> > 13:05:50.084 [Thread-24-Toll-Notification-Bolt] INFO
> backtype.storm.daemon.executor - Preparing bolt Toll-Notification-Bolt:(10)
> > 13:05:50.112 [Thread-26-Toll-Notification-Bolt] INFO
> backtype.storm.daemon.executor - Preparing bolt Toll-Notification-Bolt:(11)
>
> You see that there are different thread IDs as well as executor IDs (the
> last number in parentheses).
>
> The number of log statements should be equal to your parallelism.
>
> Furthermore, if you have some code must initialized a Spout/Bolt per
> instance, you can override Spout.open() and Bolt.prepare(),
> respectively. Those methods get called for each executer.
>
>
> -Matthias
>
>
>
> On 04/25/2016 12:55 PM, Navin Ipe wrote:
> > Hi Matthias,
> >
> > Thanks for the image and explanation. I understood the parallelism and
> > tasks, but even now when I run
> > builder.setSpout(partialSpoutName, new MongoSpout(),5).setNumTasks(100);
> > There's a line in MongoSpout's constructor
> > (System.out.println("MongoSpout created");) which is getting printed
> > only once. This is the reason for all the confusion on my part. I'd have
> > expected it to get printed at least 5 times.
> >
> > Am running on LocalCluster localCluster = new LocalCluster(); for now,
> > but even then I felt it should've created the necessary worker threads.
> >
> > When does Storm decide to create a new instance of MongoSpout?
> >
> > When nextTuple of MongoSpout looks is this,
> >     @Override
> >     public void nextTuple() {
> >         while(batch.hasNext()) {
> >             History_DBO history = batchOfHistories.next();
> >                 this.collector.emit(new Values(history._id),
> history._id);
> >             }
> >         }//while
> >     }
> >
> > Does Storm decide to create a new MongoSpout if the first MongoSpout
> > which emitted a tuple hasn't yet received an ack()? Is that how it works?
> >
> >
> >
> > On Mon, Apr 25, 2016 at 2:33 PM, Matthias J. Sax <mjsax@apache.org
> > <mailto:mjsax@apache.org>> wrote:
> >
> >     Hi Navin,
> >
> >     If you really want a "forward connection pattern", ie, all data of a
> >     single spout goes to a single bolt, your idea with a loop should
> work.
> >     Of course, as you do actually deploy distinct graphs, ie, the single
> >     parts of the topology do not exchange data, you could also just
> deploy
> >     many topologies with parallelism of one.
> >
> >     If "standard" way in Storm (without the loop), would be like this:
> >
> >     > builder.setSpout(spoutName, new MongoSpout(), 5);
> >     > builder.setBolt(boltName, new GenBolt(),
> >     5).shuffleGrouping(spoutName);
> >     > builder.setBolt(boltName2, new SomeBolt(),
> >     5).shuffleGrouping(boltName);
> >
> >     with 5 being the number of instances (if you don't specify any
> number,
> >     the default is 1. The third parameter (parallilism_hint) define the
> >     number of executor-thread (each thread will run a bolt instance).
> >
> >     However, using shuffleGrouping (or localOrShuffleGrouping), data is
> not
> >     forwarded to a single instance, but randomly re-distirbuted over all
> >     bolt instances. (see attached png showing a topology with 2 spouts
> and 4
> >     bolts -- each represented by a quare. The dots within each square
> >     illustrates the parallelism of each -- the connection pattern would
> be
> >     shuffleGrouping)
> >
> >     As an alternative, you can also use custom-grouping (or direct
> streams)
> >     to achieve a "forwarding" pattern. (with regard to dynamic scaling --
> >     see below -- this might be the correct way to go for your case).
> Using
> >     the loop, you cannot add more instances during runtime.
> >
> >     However, I am not sure what you mean by
> >
> >     "Storm should automatically create the required number of instances
> of
> >     spouts and bolts to be able to scale horizontally when I add more
> >     machines to process the data"
> >
> >     If you deploy a topology, the number of parallel instances is stable.
> >     Storm does not automatically change it. If you want to change the
> >     parallelism during runtime, you need to do this manually via
> >
> >     bin/storm rebalance ...
> >
> >     The rebalance operation requires, that the topology has enough TASKS
> >     (the parallelism cannot be larger as the number of tasks).
> >
> >     Thus, you need to "prepare" your topology during setup for dynamic
> >     scaling via .setNumTasks();
> >
> >     > builder.setSpout(spoutName, new MongoSpout(), 5).setNumTasks(100);
> >     > builder.setBolt(boltName, new GenBolt(),
> >     5).shuffleGrouping(spoutName).setNumTasks(100);
> >     > builder.setBolt(boltName2, new SomeBolt(),
> >     5).shuffleGrouping(boltName).setNumTasks(100);
> >
> >     Here, 100 define the maximum number of parallel instances you can
> run,
> >     and the initial deployment will start 5. Using rebalance you can
> change
> >     the parallelism to up to 100 now.
> >
> >     Hope this makes sens.
> >
> >
> >     -Matthias
> >
> >
> >
> >
> >     On 04/25/2016 09:11 AM, Navin Ipe wrote:
> >     > Thank you Matthias for your time and patient explanation. I'm now
> clear
> >     > about the Fields grouping (an answer on Stackoverflow had confused
> me
> >     >
> >     <
> http://stackoverflow.com/questions/33512554/multiple-fields-grouping-in-storm
> >).
> >     > The first question still stands, where I'm unable to understand
> when
> >     > multiple instances of spouts and bolts get created.
> >     >
> >     > To get a topology like this:
> >     > Spout ---->Bolt--->Bolt
> >     > Spout ---->Bolt--->Bolt
> >     > Spout ---->Bolt--->Bolt
> >     >
> >     > is what I'm trying to achieve, but if I simply say:
> >     >
> >     > /builder.setSpout(spoutName, new MongoSpout());
> >     > builder.setBolt(boltName, new
> GenBolt()).shuffleGrouping(spoutName);/
> >     > /builder.setBolt(boltName2, new
> SomeBolt()).shuffleGrouping(boltName);
> >     > /
> >     > Then I don't see how multiple instances of MongoSpout() will get
> >     > created. I've already been through a lot of tutorials and
> documentation
> >     > pages, but they don't explain that (in a way that I understand). I
> had
> >     > also run some code where I set .setNumTasks(12); but the
> constructor of
> >     > MongoSpout() got called only once.
> >     > So under what situation does storm create multiple instances of
> MongoSpout?
> >     >
> >     > The objective is that when there are a lot of things to process,
> Storm
> >     > should automatically create the required number of instances of
> spouts
> >     > and bolts to be able to scale horizontally when I add more
> machines to
> >     > process the data.
> >     >
> >     > But if I simply create this:
> >     > /builder.setSpout(spoutName, new MongoSpout());
> >     > builder.setBolt(boltName, new
> GenBolt()).shuffleGrouping(spoutName);/
> >     > /builder.setBolt(boltName2, new
> SomeBolt()).shuffleGrouping(boltName);
> >     >
> >     > /
> >     > how will Storm scale?
> >     >
> >     > I need one spout to iterate the first 1000 rows of a database and
> >     > extract data while parallelly another spout iterates through the
> next
> >     > 1000 rows of a database and so on. So if the database has 1 million
> >     > rows, Storm should automatically create that many spouts. Is that
> >     > possible? (there would be a limit on the number of spouts of
> course)
> >     >
> >     > Then, based on the number of spouts created, the same number of
> bolts
> >     > are created to process the data the spouts emit.
> >     >
> >     > So my biggest confusion has always been just this. About how are
> the
> >     > multiple spout and bolt instances created so that the processing
> can scale?
> >     >
> >     > ps: Yes I knew the tuple does not extend IRichBolt. It was a silly
> >     > mistake while I was typing :-)
> >     > //
> >     >
> >     >
> >     > On Mon, Apr 25, 2016 at 3:26 AM, Matthias J. Sax <mjsax@apache.org
> <mailto:mjsax@apache.org>
> >     > <mailto:mjsax@apache.org <mailto:mjsax@apache.org>>> wrote:
> >     >
> >     >     Hi Navin,
> >     >
> >     >     I could not follow your email completely. Let me clarify a
> >     couple of
> >     >     things to get started. If you still have question, just ask
> again.
> >     >
> >     >
> >     >
> >     >     A) A IRichBolt interface defines a bolt, and not a tuple. Thus,
> >     >
> >     >     > class SomeTuple extends IRichBolt {
> >     >     >   private Integer someID;
> >     >     >   public Integer getSomeID() {return someID;}
> >     >     > }
> >     >
> >     >     is not correct.
> >     >
> >     >     A Tuple in Storm has multiple Fields (also called Schema). You
> >     define
> >     >     the fields on a tuple in .declareOutputStream(...)
> >     >
> >     >
> >     >
> >     >     B) So
> >     >
> >     >     >     public void declareOutputFields(OutputFieldsDeclarer
> >     declarer) {
> >     >     >         declarer.declare(new Fields("A","B","C","D"));
> >     >     >     }
> >     >
> >     >     does declare, that the tuples that are emitted by this Spout
> >     or Bolt
> >     >     have 4 fields. "A", "B", "C", "D" are the names of those
> >     fields (ie,
> >     >     meta data).
> >     >
> >     >     If you want to emit a corresponding tuple you call
> >     >
> >     >         output.emit(new Value(1, "x", 3.4, true));
> >     >
> >     >     This would emit Tuple with an Integer,String,Double,Boolean as
> >     concrete
> >     >     data types. The values can of course be anything.
> >     >
> >     >
> >     >
> >     >     C) If you use fieldsGrouping, you always need to specify the
> >     fields you
> >     >     wanna group on (it is not possible to use zero fields for
> this).
> >     >
> >     >     If you have for example an ID field in your tuple and you want
> >     that all
> >     >     tuples with the same ID are processed be the same bolt
> >     instance, you can
> >     >     do it like this:
> >     >
> >     >         setBolt("PRODUCER", new MyPBolt());
> >     >         setBolt("CONSUMER", new MyCBolt(),
> >     >     parallelism).fieldGrouping("PRODUCER", new Fields("ID"));
> >     >
> >     >     Of course, MyPBolt must declare a field with name "ID" in it's
> >     >     implementation of declareOutputFields() (otherwise Storm will
> >     complain).
> >     >
> >     >
> >     >     So answer to first question (the other two should already be
> >     covered)
> >     >
> >     >     > *1. *The way I'm using the for loop above is wrong isn't it?
> >     If I
> >     >     use a
> >     >     > single builder.setSpout and set the numTasks value, then
> >     Storm would
> >     >     > create those many Spout instances automatically?
> >     >
> >     >     Not sure what you want to get -- so not sure if the loop is
> >     right or
> >     >     wrong. And yes, if you use
> >     >
> >     >     builder.setSpout("name", new MySpout(), parallelism)
> >     >
> >     >     Storm will automatically start <parallelism> instances of
> >     MySpout. Do
> >     >     not confuse this with tasks (ie, setNumTasks()). Tasks are
> >     logical units
> >     >     of parallelism, while <parallelism> defines the number of
> >     threads (ie,
> >     >     physical parallelism). See here for more details:
> >     >
> >
> https://storm.apache.org/releases/1.0.0/Understanding-the-parallelism-of-a-Storm-topology.html
> >     >
> >     >
> >     >     Hope this clears things up a little bit. If you are still
> >     confused, look
> >     >     at the example in storm-starter.
> >     >
> >     >
> >     >     -Matthias
> >     >
> >     >
> >     >     On 04/24/2016 07:44 PM, Navin Ipe wrote:
> >     >     > To parallelize some code, I considered having this topology.
> >     The single
> >     >     > [Spout] or [Bolt] represent multiple Spouts or Bolts.
> >     >     >
> >     >     > *[Spout]--emit--->[Bolt A]--emit--->[Bolt B]*
> >     >     >
> >     >     > If any of the bolts in Bolt A emit a Tuple of value 1, and
> >     it gets
> >     >     > processed by a certain bolt in Bolt B, then it is imperative
> >     that if any
> >     >     > of the bolts in Bolt A again emits the value 1, it should
> >     compulsorily
> >     >     > be processed by the same bolt in Bolt B. I assume fields
> >     grouping can
> >     >     > handle this.
> >     >     >
> >     >     > To have many spouts work in parallel, my initial thoughts
> >     were to have:
> >     >     > /        Integer numberOfSpouts = 10;
> >     >     >
> >     >     >         String partialSpoutName = "mongoSpout";
> >     >     >         String partialBoltName = "mongoBolt";
> >     >     >
> >     >     >         for(Integer i = 0; i < numberOfSpouts; ++i) {
> >     >     >             String spoutName = partialSpoutName +
> i.toString();
> >     >     >             String boltName = partialBoltName + i.toString();
> >     >     >
> >     >     >             builder.setSpout(spoutName, new MongoSpout());
> >     >     >             builder.setBolt(boltName, new
> >     >     > GenBolt()).shuffleGrouping(spoutName);
> >     >     >         }/
> >     >     >
> >     >     > But I realized it was probably not the right way, because in
> >     case I
> >     >     > wanted all of Bolt A's tuples to go to a single Bolt B, then
> >     I'd have to
> >     >     > include cases like this:
> >     >     >
> >     >     > /        switch (numberOfSpouts) {
> >     >     >             case 3:
> >     >     >                 builder.setBolt("sqlWriterBolt", new
> >     >     > SqlWriterBolt(appConfig),3)
> >     >     >                         .shuffleGrouping(partialBoltName+"2")
> >     >     >                         .shuffleGrouping(partialBoltName+"1")
> >     >     >
> >     >     > .shuffleGrouping(partialBoltName+"0");
> >     >     >
> >     >     >                 break;
> >     >     >             case 2:
> >     >     >                 builder.setBolt("sqlWriterBolt", new
> >     >     > SqlWriterBolt(appConfig),2)
> >     >     >                         .shuffleGrouping(partialBoltName+"1")
> >     >     >
> >     >     > .shuffleGrouping(partialBoltName+"0");
> >     >     >
> >     >     >                 break;
> >     >     >             case 1:
> >     >     >                 builder.setBolt("sqlWriterBolt", new
> >     >     > SqlWriterBolt(appConfig),1)
> >     >     >
> >     >     > .shuffleGrouping(partialBoltName+"0");
> >     >     >                 break;
> >     >     >             default:
> >     >     >                 System.out.println("No case here");
> >     >     >         }//switch/
> >     >     >
> >     >     > *So three questions:*
> >     >     > *1. *The way I'm using the for loop above is wrong isn't it?
> >     If I
> >     >     use a
> >     >     > single builder.setSpout and set the numTasks value, then
> >     Storm would
> >     >     > create those many Spout instances automatically?
> >     >     > *
> >     >     > 2.* When you specify something like this for fields grouping:
> >     >     >     public void declareOutputFields(OutputFieldsDeclarer
> >     declarer) {
> >     >     >         declarer.declare(new Fields("A","B","C","D"));
> >     >     >     }
> >     >     > What does it mean? Does it mean that 4 types of tuples might
> >     be emitted?
> >     >     > When they are received as builder.setBolt("/sqlWriterBolt/",
> new
> >     >     > Bolt_AB(), 3).fieldsGrouping("/mongoBolt/", new Fields("A",
> >     "B"));
> >     >     >
> >     >     > Does it mean that my the first 2 tuples will go to the
> >     Bolt_AB and the
> >     >     > next 2 tuples may go to any other bolt that receives tuples
> from
> >     >     > mongoBolt, and then the next two tuples will go to Bolt_AB
> >     again? Is
> >     >     > that how it works?
> >     >     >
> >     >     > *3. *If I don't specify any new Fields("A", "B"), how does
> >     Storm know
> >     >     > the grouping? How does it decide? If I have a tuple like
> this:
> >     >     > class SomeTuple extends IRichBolt {
> >     >     >   private Integer someID;
> >     >     >   public Integer getSomeID() {return someID;}
> >     >     > }
> >     >     > and if Bolt A sends SomeTuple to Bolt B, with SomeID value
> >     (assume a
> >     >     > SomeID value is 9) and the next time Bolt A generates a
> >     Tuple with
> >     >     > someID = 9 (and it may generate many tuples with someID=9),
> >     how do I
> >     >     > ensure that Storm sees the SomeID value and decides to send
> >     it to the
> >     >     > Bolt B instance that processes all someID's which have a
> >     value of 9?
> >     >     >
> >     >     >
> >     >     > --
> >     >     > Regards,
> >     >     > Navin
> >     >
> >     >
> >     >
> >     >
> >     > --
> >     > Regards,
> >     > Navin
> >
> >
> >
> >
> > --
> > Regards,
> > Navin
>
>


-- 
Regards,
Navin

Mime
View raw message