storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <navin....@searchlighthealth.com>
Subject Re: How does one distribute database iteration across workers?
Date Wed, 20 Apr 2016 06:28:30 GMT
@Jungtaek: This person (
http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
claims that Storm would automatically manage the flow of data between
spouts and blots on different workers. Can anyone confirm this? If this is
the case, I won't have to bother using Trident.

On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <navin.ipe@searchlighthealth.com>
wrote:

> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
> searching for the wrong thing)
>
> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
> Trident, but didn't go ahead because it was not clear how I could implement
> multiple spouts in Trident, where each spout would iterate a certain number
> of rows of a database. Any idea how that could happen.
>
> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <kabhwan@gmail.com> wrote:
>
>> There's other idea without relying on Zookeeper : use ordinal of task id
>> between same components (spout)
>>
>> Task id is issued across all tasks including system tasks so you can't
>> assume spout tasks are having task id sequentially, but whatever you can do
>> the trick - check "ordinal" of this spout task's id around same spouts.
>> Please refer GeneralTopologyContext.getComponentTasks(String componentId).
>>
>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>> easy to aggregate the results of Bolt2 from Bolt3.
>> You should consider windowing by processed time or Trident or maintain
>> your own buffers.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <jason@kusar.net>님이
작성:
>>
>>> Hi,
>>>
>>> I've done a similar thing before with the exception that I was reading
>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>> you have 10,000 records and you want each spout to read 1,000 of them, then
>>> you would launch 10 instances of the spouts.  The first thing they do
>>> during init is to connect to zookeeper and create an ephemeral node (
>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>> which means you try to create '1' and so on until you successfully create a
>>> node.  That tells you which batch of records that instance of the spout is
>>> responsible for.  I.e., if you successfully created '3', then this spout
>>> needs to set its offset to 3,000.
>>>
>>> The reason for using ephemeral nodes is that they are automatically
>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>> resume work on that batch.  You'll obviously need to have some way to keep
>>> track of which records you've already processed, but that's going to be
>>> specific to your implementation.
>>>
>>> Hope that helps!
>>> Jason
>>>
>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>> navin.ipe@searchlighthealth.com> wrote:
>>>
>>>> Thanks guys.
>>>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>>>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>>>
>>>> As of now I've set
>>>> config.setNumWorkers(2);
>>>> and
>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>
>>>> I'm able to get spoutID in open() using this.spoutId =
>>>> context.getThisTaskId();
>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>
>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>
>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>> across multiple workers? Won't I have to create separate spouts?
>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>> and so on?
>>>>
>>>>
>>>>
>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamkonto@gmail.com>
>>>> wrote:
>>>>
>>>>> Coreection - group on partition id
>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <navin.ipe@searchlighthealth.com>
>>>>> wrote:
>>>>>
>>>>>> I've seen this:
>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>> requesting a bit of clarity.
>>>>>>
>>>>>> I'm considering a situation where I have 2 million rows in MySQL
or
>>>>>> MongoDB.
>>>>>>
>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then
send
>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>> happens in Worker2.
>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>> 4. I might setup 10 workers like this.
>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>> outputs to a single Bolt in Worker 11.
>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>> table.
>>>>>>
>>>>>> *My confusion here is in how to make the database iterations happen
>>>>>> batch by batch, parallelly*. Obviously the database connection would
>>>>>> have to be made in some static class outside the workers, but if
workers
>>>>>> are started with just "conf.setNumWorkers(2);", then how do I tell
>>>>>> the workers to iterate different rows of the database? Assuming that
the
>>>>>> workers are running in different machines.
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Navin
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>
>
> --
> Regards,
> Navin
>



-- 
Regards,
Navin

Mime
View raw message