storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alexander T <>
Subject Re: How does one distribute database iteration across workers?
Date Tue, 19 Apr 2016 06:18:51 GMT
Hi Navin,

I'm not sure if this scenario is a perfect fit for Storm since you want
precice control of colocation. But If I understand your problem correctly
the following could be a viable approach:

1. Establish a total order of spout instances by utilizing Zookeeper. Your
spout instances will now have ids 0,1,2,3,4 etc.

2. Partition the keyspace of your db table using the spout instance id. So
if you have 5 instances instance 0 gets 0-1000 followed by 5000-6000 etc.

3. Emit from the spout with a field for the sequence number (0-1000 etc)
and one for the partition id.

4. Do a field grouping on the sequence number from the spout to your bolt.

You have now pinned each partition to  a  bolt instance and you can buffer
until the batch is complete. If you want one partition per bolt instance,
set bolt parallelism=spout parallelism.

I've seen this:
but it doesn't explain how workers coordinate with each other, so
requesting a bit of clarity.

I'm considering a situation where I have 2 million rows in MySQL or MongoDB.

1. I want to use a Spout to read the first 1000 rows and send the processed
output to a Bolt. This happens in Worker1.
2. I want a different instance of the same Spout class to read the next
1000 rows in parallel with the working of the Spout of 1, then send the
processed output to an instance of the same Bolt used in 1. This happens in
3. Same as 1 and 2, but it happens in Worker 3.
4. I might setup 10 workers like this.
5. When all the Bolts in the workers are finished, they send their outputs
to a single Bolt in Worker 11.
6. The Bolt in Worker 11 writes the processed value to a new MySQL table.

*My confusion here is in how to make the database iterations happen batch
by batch, parallelly*. Obviously the database connection would have to be
made in some static class outside the workers, but if workers are started
with just "conf.setNumWorkers(2);", then how do I tell the workers to
iterate different rows of the database? Assuming that the workers are
running in different machines.


View raw message