storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <>
Subject Re: How does one distribute database iteration across workers?
Date Wed, 20 Apr 2016 03:29:24 GMT
@Jason: Thanks. Tried searching for Storm code which starts Ephemeral
nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
searching for the wrong thing)

@Jungtaek: Will explore component tasks. Meanwhile, I had considered
Trident, but didn't go ahead because it was not clear how I could implement
multiple spouts in Trident, where each spout would iterate a certain number
of rows of a database. Any idea how that could happen.

On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <> wrote:

> There's other idea without relying on Zookeeper : use ordinal of task id
> between same components (spout)
> Task id is issued across all tasks including system tasks so you can't
> assume spout tasks are having task id sequentially, but whatever you can do
> the trick - check "ordinal" of this spout task's id around same spouts.
> Please refer GeneralTopologyContext.getComponentTasks(String componentId).
> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
> easy to aggregate the results of Bolt2 from Bolt3.
> You should consider windowing by processed time or Trident or maintain
> your own buffers.
> Hope this helps.
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <>님이 작성:
>> Hi,
>> I've done a similar thing before with the exception that I was reading
>> from Cassandra.  The concept is the same though.  Assuming you know that
>> you have 10,000 records and you want each spout to read 1,000 of them, then
>> you would launch 10 instances of the spouts.  The first thing they do
>> during init is to connect to zookeeper and create an ephemeral node (
>> starting with one called '0'.  If 0 already exists, you'll get an exception
>> which means you try to create '1' and so on until you successfully create a
>> node.  That tells you which batch of records that instance of the spout is
>> responsible for.  I.e., if you successfully created '3', then this spout
>> needs to set its offset to 3,000.
>> The reason for using ephemeral nodes is that they are automatically
>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>> once Storm relaunches the spout, it will be able to re-claim that token and
>> resume work on that batch.  You'll obviously need to have some way to keep
>> track of which records you've already processed, but that's going to be
>> specific to your implementation.
>> Hope that helps!
>> Jason
>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>> wrote:
>>> Thanks guys.
>>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>> As of now I've set
>>> config.setNumWorkers(2);
>>> and
>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>> I'm able to get spoutID in open() using this.spoutId =
>>> context.getThisTaskId();
>>> Strangely, my spoutID always begins with 3 instead of 0.
>>> By partitionID I understand that's the fieldGrouping's id.
>>> Even if I do all this, will the spout's tasks actually be distributed
>>> across multiple workers? Won't I have to create separate spouts?
>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>> and so on?
>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <>
>>> wrote:
>>>> Coreection - group on partition id
>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <>
>>>> wrote:
>>>>> I've seen this:
>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>> requesting a bit of clarity.
>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>> MongoDB.
>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>> processed output to a Bolt. This happens in Worker1.
>>>>> 2. I want a different instance of the same Spout class to read the
>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>> happens in Worker2.
>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>> 4. I might setup 10 workers like this.
>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>> outputs to a single Bolt in Worker 11.
>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>> table.
>>>>> *My confusion here is in how to make the database iterations happen
>>>>> batch by batch, parallelly*. Obviously the database connection would
>>>>> have to be made in some static class outside the workers, but if workers
>>>>> are started with just "conf.setNumWorkers(2);", then how do I tell
>>>>> the workers to iterate different rows of the database? Assuming that
>>>>> workers are running in different machines.
>>>>> --
>>>>> Regards,
>>>>> Navin
>>> --
>>> Regards,
>>> Navin


View raw message