storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Sharma <ping2r...@gmail.com>
Subject Re: Does Storm work with Spring
Date Sat, 10 Oct 2015 14:21:18 GMT
Hi Ankur,
local it may be working but It wont work in Actual cluster.

Think about SpringContext is collection of your so many resoucres, like
Database connections , may be HTTP connections , Thread pools etc.
These things wont get serialised and just go to other machines and start
working.

SO basically in init methods of bolt and spout, you need to call some
singloton class like this

ApplicationContext ac = SingletonApplicationContext.getContext();

SingletonApplicationContext will have a static variable ApplicationContext
and in getContext you will check if static variable has been initialised if
not then u will initilize it, and then return it(normal Singleton class)


Now when Topolgy will move to any other node, Bolt and spouts will start
and first init call will initialize it and other bolt/spouts will just use
that.

As John mentioned, its very important to mark all Spring beans and Context
as transient.

Hope it helps.

Ravi.





On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com> wrote:

> Hi Javier ,
>
> So , I am using a Local cluster on my dev machine where I am using Eclipse
> . Here , I am passing Springs ApplicationContext as constructor argument to
> spouts and bolts .
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), 10);
>
> builder.setBolt("mapBolt", new GroupingBolt(appContext),
> 10).shuffleGrouping("rabbitMqSpout");
>
> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
> 10).shuffleGrouping("mapBolt");
>
> Config conf = new Config();
>
> conf.registerSerialization(EventBean.class); /
>
> conf.registerSerialization(InputQueueManagerImpl.class);
>
> conf.setDebug(true);
>
>  LocalCluster cluster = new LocalCluster();
>
> cluster.submitTopology("test", conf, builder.createTopology());
>
>
> And in my spouts and Bolts ,
>
> I make my Application Context variable as static  . So when it is launched
> by c;uster.submitTopology , my context is still avalilable
>
>
> private static ApplicationContext ctx;
>
> public RabbitListnerSpout(ApplicationContext appContext) {
>
> LOG.info("RabbitListner Constructor called");
>
> ctx = appContext;
>
> }
>
>
> @SuppressWarnings("rawtypes")
>
> @Override
>
> public void open(Map conf, TopologyContext context,SpoutOutputCollector
> collector) {
>
> LOG.info("Inside the open Method for RabbitListner Spout");
>
> inputManager = (InputQueueManagerImpl) ctx.getBean(InputQueueManagerImpl.
> class);
>
> notificationManager = (NotificationQueueManagerImpl) ctx
> .getBean(NotificationQueueManagerImpl.class);
>
> eventExchange = ctx.getEnvironment().getProperty(
> "input.rabbitmq.events.exchange");
>
> routingKey = ctx.getEnvironment().getProperty(
> "input.rabbitmq.events.routingKey");
>
> eventQueue = ctx.getEnvironment().getProperty(
> "input.rabbitmq.events.queue");
>
> _collector = collector;
>
> LOG.info("Exiting the open Method for RabbitListner Spout");
>
> }
>
>
> This is working like a charm (my ApplicationContext is initialized
> seperately ) . As we all know , ApplicationContext is not serializable .
> But this works well in LocalCluster.
>
> My assumption is that it will work in a seperate Cluster too . Is my
> assumption correct ??
>
> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <jagonzal@gmail.com>
> wrote:
>
>> IIRC, only if everything you use in your spouts and bolts is
>> serializable.
>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>
>>> Hi Ravi ,
>>>
>>> I was able to make an Integration with Spring but the problem is that I
>>> have to autowire for every bolt and spout . That means that even if i
>>> parallelize spout and bolt it will get started to each instance  . Is there
>>> some way that I only have to do for bolts and spouts once (I mean if I
>>> parallelize bolts or spouts individually it can share the conf from
>>> somewhere) . IS this possible??
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <ping2ravi@gmail.com>
>>> wrote:
>>>
>>>> Yes this is for annotation also...
>>>>
>>>> you can call this method in prepare()  method of bolt and onOpen()
>>>> method
>>>> in every Spout and make sure you don't use any autowire bean before this
>>>> call.
>>>>
>>>>
>>>>
>>>>
>>>> Ravi.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Ravi ,
>>>> >
>>>> > Thanks for your reply . I am using annotation based configuration and
>>>> using
>>>> > Spring Boot.
>>>> >
>>>> > Any idea how to do it using annotations ?
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>> wrote:
>>>> >
>>>> > > Bolts and Spouts are created by Storm and not known to Spring
>>>> Context.
>>>> > You
>>>> > > need to manually add them to SpringContext, there are few methods
>>>> > available
>>>> > > i.e.
>>>> > >
>>>> > >
>>>> > >
>>>> >
>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false);
>>>> > >
>>>> > > SpringContext is my own class where i have injected SpringContext
so
>>>> > > SpringContext.getContext() returns the actuall Spring Context
>>>> > >
>>>> > >
>>>> > >
>>>> > >
>>>> > > Ravi.
>>>> > >
>>>> > >
>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>> > wrote:
>>>> > >
>>>> > > > Hi ,
>>>> > > >
>>>> > > > I am building a Storm topology with set of Spouts and Bolts
 and
>>>> also
>>>> > > using
>>>> > > > Spring for Dependency Injection .
>>>> > > >
>>>> > > > Unfortunately , none of my fields are getting autowired even
>>>> though I
>>>> > > have
>>>> > > > declared all my spouts and Bolts as @Components .
>>>> > > >
>>>> > > > However the place where I am declaring my topology , Spring
is
>>>> working
>>>> > > fine
>>>> > > > .
>>>> > > >
>>>> > > > Is it because cluster.submitTopology("test", conf,
>>>> > > > builder.createTopology())
>>>> > > >  submits the topology to a cluster (locally it spawns different
>>>> thread
>>>> > > for
>>>> > > > Spouts and Bolts) that Autowiring is not working?
>>>> > > >
>>>> > > > Please suggest .
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>
>>>
>

Mime
View raw message