storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <navin....@searchlighthealth.com>
Subject Re: How does one distribute database iteration across workers?
Date Wed, 20 Apr 2016 07:33:17 GMT
In this case the values coming from the bolts can all be put() or updated
into a single large hashmap in the Bolt in Worker11. So no need of
aggregation.
If there is no standard way in Storm for spouts and bolts to notify each
other that there is nothing more for them to process, then I guess I'll
just send tuples with "null" in them so that the Bolt in Worker11 will know
the processing is over.

I just hope what y'all said about tuples being automatically passed between
workers, actually works without any problems :-)
Thanks a lot for all your help!

On Wed, Apr 20, 2016 at 12:06 PM, Jungtaek Lim <kabhwan@gmail.com> wrote:

> Navin,
>
> I think this two lines are not cleared so I may have misunderstand.
>
> 5. When all the Bolts in the workers are finished, they send their outputs
> to a single Bolt in Worker 11.
> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>
> If you don't need to aggregate (I mean join) the results from Bolt in
> Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.
>
> 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <navin.ipe@searchlighthealth.com>님이
> 작성:
>
>> @Jungtaek: This person (
>> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
>> claims that Storm would automatically manage the flow of data between
>> spouts and blots on different workers. Can anyone confirm this? If this is
>> the case, I won't have to bother using Trident.
>>
>> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
>> navin.ipe@searchlighthealth.com> wrote:
>>
>>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>>> searching for the wrong thing)
>>>
>>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>>> Trident, but didn't go ahead because it was not clear how I could implement
>>> multiple spouts in Trident, where each spout would iterate a certain number
>>> of rows of a database. Any idea how that could happen.
>>>
>>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <kabhwan@gmail.com> wrote:
>>>
>>>> There's other idea without relying on Zookeeper : use ordinal of task
>>>> id between same components (spout)
>>>>
>>>> Task id is issued across all tasks including system tasks so you can't
>>>> assume spout tasks are having task id sequentially, but whatever you can
do
>>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>>> componentId).
>>>>
>>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>>> easy to aggregate the results of Bolt2 from Bolt3.
>>>> You should consider windowing by processed time or Trident or maintain
>>>> your own buffers.
>>>>
>>>> Hope this helps.
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <jason@kusar.net>님이
작성:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've done a similar thing before with the exception that I was reading
>>>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>>>> you have 10,000 records and you want each spout to read 1,000 of them,
then
>>>>> you would launch 10 instances of the spouts.  The first thing they do
>>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>>>> which means you try to create '1' and so on until you successfully create
a
>>>>> node.  That tells you which batch of records that instance of the spout
is
>>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>>> needs to set its offset to 3,000.
>>>>>
>>>>> The reason for using ephemeral nodes is that they are automatically
>>>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>>>> once Storm relaunches the spout, it will be able to re-claim that token
and
>>>>> resume work on that batch.  You'll obviously need to have some way to
keep
>>>>> track of which records you've already processed, but that's going to
be
>>>>> specific to your implementation.
>>>>>
>>>>> Hope that helps!
>>>>> Jason
>>>>>
>>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>
>>>>>> Thanks guys.
>>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)"
for a
>>>>>> Spout?
>>>>>>
>>>>>> As of now I've set
>>>>>> config.setNumWorkers(2);
>>>>>> and
>>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>>
>>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>>> context.getThisTaskId();
>>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>>
>>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>>
>>>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>>>> across multiple workers? Won't I have to create separate spouts?
>>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>>> and so on?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <
>>>>>> mittspamkonto@gmail.com> wrote:
>>>>>>
>>>>>>> Coreection - group on partition id
>>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <
>>>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>>>
>>>>>>>> I've seen this:
>>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>>> but it doesn't explain how workers coordinate with each other,
so
>>>>>>>> requesting a bit of clarity.
>>>>>>>>
>>>>>>>> I'm considering a situation where I have 2 million rows in
MySQL or
>>>>>>>> MongoDB.
>>>>>>>>
>>>>>>>> 1. I want to use a Spout to read the first 1000 rows and
send the
>>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>>> 2. I want a different instance of the same Spout class to
read the
>>>>>>>> next 1000 rows in parallel with the working of the Spout
of 1, then send
>>>>>>>> the processed output to an instance of the same Bolt used
in 1. This
>>>>>>>> happens in Worker2.
>>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>>> 4. I might setup 10 workers like this.
>>>>>>>> 5. When all the Bolts in the workers are finished, they send
their
>>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a
new MySQL
>>>>>>>> table.
>>>>>>>>
>>>>>>>> *My confusion here is in how to make the database iterations
happen
>>>>>>>> batch by batch, parallelly*. Obviously the database connection
>>>>>>>> would have to be made in some static class outside the workers,
but if
>>>>>>>> workers are started with just "conf.setNumWorkers(2);", then
how
>>>>>>>> do I tell the workers to iterate different rows of the database?
Assuming
>>>>>>>> that the workers are running in different machines.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Navin
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Navin
>>>>>>
>>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>


-- 
Regards,
Navin

Mime
View raw message