storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Sharma <ping2r...@gmail.com>
Subject Re: Does Storm work with Spring
Date Sun, 11 Oct 2015 05:50:45 GMT
Basically u will have two context defined at different time/phase

When u r about to submit the topology, u need to build topology, that
context only need information about spouts and bolts.  You don't need any
application bean like database accessories or ur services etc, as at this
level u r not running ur application but u r just creating a topology and
defining how bolts and spouts are connected to each other etc etc

Now once topology is submitted, topology will be moved to one of the
supervisor node and will start running, all spouts and bolts will be
initialized,  at this moment u will need ur application context, which
doesn't need ur earlier topology context

So I will suggest keep both context separate.

Topology is not complex to build, smaller topology can be built via code
only, I. E. Which bolt listening to which spout, but if u want to go with
good design, I say just write a small wrapper to read some json where u can
define ur bolts and spouts and use that to build topology (u can use spring
but it's not much needed)

In past I have done it using both json setting (without spring) and xml
setting (with spring) both works good

Ravi
On 11 Oct 2015 06:38, "Ankur Garg" <ankurgarg9@gmail.com> wrote:

> Oh The problem here is I have many beans and which need to be initialized
> (some are reading conf from yml files , database connection , thread pool
> initialization etc) .
>
>
> Now , I have written a spring boot application which takes care of all the
> above and I define my topology inside one of the beans , Here is my bean
>
> @Autowired
> ApplicationContext appContext;
>
> @Bean
> public void submitTopology() throws
> AlreadyAliveException,InvalidTopologyException {
>
>    TopologyBuilder builder = new TopologyBuilder();
>
>    builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
> 10);
>
>    builder.setBolt("mapBolt", new GroupingBolt(appContext),
> 10).shuffleGrouping("rabbitMqSpout");
>
> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
> 10).shuffleGrouping("mapBolt");
>
> Config conf = new Config();
>
> conf.registerSerialization(EventBean.class); // To be registered with Kyro
> for Storm
>
> conf.registerSerialization(InputQueueManagerImpl.class);
>
> conf.setDebug(true);
>
>  conf.setMessageTimeoutSecs(200);
>
>    LocalCluster cluster = new LocalCluster();
>
>   cluster.submitTopology("test", conf, builder.createTopology());
>
> }
>
>
> When this bean is initialized , I already have appContext initialized by
> my Spring Boot Application . So , the thing is , I am using SpringBoot to
> initialize and load my context with all beans .
>
> Now this is the context which I want to leverage in my spouts and bolts .
>
> So , if what I suggested earlier does  not work on Storm Distributed
> Cluster , I need to find a way of initializing my AppContext somehow:(
>
> I would be really thankful if anyone here can help me :(
>
>
> Thanks
>
> Ankur
>
> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com>
> wrote:
>
>> The local cluster runs completely within a single JVM AFAIK. The local
>> cluster is useful for development, testing your topology, etc. The real
>> deployment has to go through nimbus, run on workers started by supervisors
>> on one or more nodes, etc. Kind of difficult to simulate all that on a
>> single box.
>>
>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
>>
>>> Oh ...So I will have to test it in a cluster.
>>>
>>> Having said that, how is local cluster which we use is too different
>>> from normal cluster.. Ideally ,it shud simulate normal cluster..
>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com> wrote:
>>>
>>>> Hi Ankur,
>>>> local it may be working but It wont work in Actual cluster.
>>>>
>>>> Think about SpringContext is collection of your so many resoucres, like
>>>> Database connections , may be HTTP connections , Thread pools etc.
>>>> These things wont get serialised and just go to other machines and
>>>> start working.
>>>>
>>>> SO basically in init methods of bolt and spout, you need to call some
>>>> singloton class like this
>>>>
>>>> ApplicationContext ac = SingletonApplicationContext.getContext();
>>>>
>>>> SingletonApplicationContext will have a static variable
>>>> ApplicationContext and in getContext you will check if static variable has
>>>> been initialised if not then u will initilize it, and then return it(normal
>>>> Singleton class)
>>>>
>>>>
>>>> Now when Topolgy will move to any other node, Bolt and spouts will
>>>> start and first init call will initialize it and other bolt/spouts will
>>>> just use that.
>>>>
>>>> As John mentioned, its very important to mark all Spring beans and
>>>> Context as transient.
>>>>
>>>> Hope it helps.
>>>>
>>>> Ravi.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Javier ,
>>>>>
>>>>> So , I am using a Local cluster on my dev machine where I am using
>>>>> Eclipse . Here , I am passing Springs ApplicationContext as constructor
>>>>> argument to spouts and bolts .
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>
>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
>>>>> 10);
>>>>>
>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>
>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>> 10).shuffleGrouping("mapBolt");
>>>>>
>>>>> Config conf = new Config();
>>>>>
>>>>> conf.registerSerialization(EventBean.class); /
>>>>>
>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>
>>>>> conf.setDebug(true);
>>>>>
>>>>>  LocalCluster cluster = new LocalCluster();
>>>>>
>>>>> cluster.submitTopology("test", conf, builder.createTopology());
>>>>>
>>>>>
>>>>> And in my spouts and Bolts ,
>>>>>
>>>>> I make my Application Context variable as static  . So when it is
>>>>> launched by c;uster.submitTopology , my context is still avalilable
>>>>>
>>>>>
>>>>> private static ApplicationContext ctx;
>>>>>
>>>>> public RabbitListnerSpout(ApplicationContext appContext) {
>>>>>
>>>>> LOG.info("RabbitListner Constructor called");
>>>>>
>>>>> ctx = appContext;
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> @SuppressWarnings("rawtypes")
>>>>>
>>>>> @Override
>>>>>
>>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector
>>>>> collector) {
>>>>>
>>>>> LOG.info("Inside the open Method for RabbitListner Spout");
>>>>>
>>>>> inputManager = (InputQueueManagerImpl) ctx
>>>>> .getBean(InputQueueManagerImpl.class);
>>>>>
>>>>> notificationManager = (NotificationQueueManagerImpl) ctx
>>>>> .getBean(NotificationQueueManagerImpl.class);
>>>>>
>>>>> eventExchange = ctx.getEnvironment().getProperty(
>>>>> "input.rabbitmq.events.exchange");
>>>>>
>>>>> routingKey = ctx.getEnvironment().getProperty(
>>>>> "input.rabbitmq.events.routingKey");
>>>>>
>>>>> eventQueue = ctx.getEnvironment().getProperty(
>>>>> "input.rabbitmq.events.queue");
>>>>>
>>>>> _collector = collector;
>>>>>
>>>>> LOG.info("Exiting the open Method for RabbitListner Spout");
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> This is working like a charm (my ApplicationContext is initialized
>>>>> seperately ) . As we all know , ApplicationContext is not serializable
.
>>>>> But this works well in LocalCluster.
>>>>>
>>>>> My assumption is that it will work in a seperate Cluster too . Is my
>>>>> assumption correct ??
>>>>>
>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <jagonzal@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> IIRC, only if everything you use in your spouts and bolts is
>>>>>> serializable.
>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com>
wrote:
>>>>>>
>>>>>>> Hi Ravi ,
>>>>>>>
>>>>>>> I was able to make an Integration with Spring but the problem
is
>>>>>>> that I have to autowire for every bolt and spout . That means
that even if
>>>>>>> i parallelize spout and bolt it will get started to each instance
 . Is
>>>>>>> there some way that I only have to do for bolts and spouts once
(I mean if
>>>>>>> I parallelize bolts or spouts individually it can share the conf
from
>>>>>>> somewhere) . IS this possible??
>>>>>>>
>>>>>>> Thanks
>>>>>>> Ankur
>>>>>>>
>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes this is for annotation also...
>>>>>>>>
>>>>>>>> you can call this method in prepare()  method of bolt and
onOpen()
>>>>>>>> method
>>>>>>>> in every Spout and make sure you don't use any autowire bean
before
>>>>>>>> this
>>>>>>>> call.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Ravi.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi Ravi ,
>>>>>>>> >
>>>>>>>> > Thanks for your reply . I am using annotation based
configuration
>>>>>>>> and using
>>>>>>>> > Spring Boot.
>>>>>>>> >
>>>>>>>> > Any idea how to do it using annotations ?
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > > Bolts and Spouts are created by Storm and not known
to Spring
>>>>>>>> Context.
>>>>>>>> > You
>>>>>>>> > > need to manually add them to SpringContext, there
are few
>>>>>>>> methods
>>>>>>>> > available
>>>>>>>> > > i.e.
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> >
>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT,
false);
>>>>>>>> > >
>>>>>>>> > > SpringContext is my own class where i have injected
>>>>>>>> SpringContext so
>>>>>>>> > > SpringContext.getContext() returns the actuall
Spring Context
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > > Ravi.
>>>>>>>> > >
>>>>>>>> > >
>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg <
>>>>>>>> ankurgarg9@gmail.com>
>>>>>>>> > wrote:
>>>>>>>> > >
>>>>>>>> > > > Hi ,
>>>>>>>> > > >
>>>>>>>> > > > I am building a Storm topology with set of
Spouts and Bolts
>>>>>>>> and also
>>>>>>>> > > using
>>>>>>>> > > > Spring for Dependency Injection .
>>>>>>>> > > >
>>>>>>>> > > > Unfortunately , none of my fields are getting
autowired even
>>>>>>>> though I
>>>>>>>> > > have
>>>>>>>> > > > declared all my spouts and Bolts as @Components
.
>>>>>>>> > > >
>>>>>>>> > > > However the place where I am declaring my
topology , Spring
>>>>>>>> is working
>>>>>>>> > > fine
>>>>>>>> > > > .
>>>>>>>> > > >
>>>>>>>> > > > Is it because cluster.submitTopology("test",
conf,
>>>>>>>> > > > builder.createTopology())
>>>>>>>> > > >  submits the topology to a cluster (locally
it spawns
>>>>>>>> different thread
>>>>>>>> > > for
>>>>>>>> > > > Spouts and Bolts) that Autowiring is not working?
>>>>>>>> > > >
>>>>>>>> > > > Please suggest .
>>>>>>>> > > >
>>>>>>>> > >
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>
>>
>> --
>> Javier González Nicolini
>>
>
>

Mime
View raw message