storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Does Storm work with Spring
Date Sun, 11 Oct 2015 05:38:35 GMT
Oh The problem here is I have many beans and which need to be initialized
(some are reading conf from yml files , database connection , thread pool
initialization etc) .


Now , I have written a spring boot application which takes care of all the
above and I define my topology inside one of the beans , Here is my bean

@Autowired
ApplicationContext appContext;

@Bean
public void submitTopology() throws
AlreadyAliveException,InvalidTopologyException {

   TopologyBuilder builder = new TopologyBuilder();

   builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
10);

   builder.setBolt("mapBolt", new GroupingBolt(appContext),
10).shuffleGrouping("rabbitMqSpout");

builder.setBolt("reduceBolt", new PublishingBolt(appContext),
10).shuffleGrouping("mapBolt");

Config conf = new Config();

conf.registerSerialization(EventBean.class); // To be registered with Kyro
for Storm

conf.registerSerialization(InputQueueManagerImpl.class);

conf.setDebug(true);

 conf.setMessageTimeoutSecs(200);

   LocalCluster cluster = new LocalCluster();

  cluster.submitTopology("test", conf, builder.createTopology());

}


When this bean is initialized , I already have appContext initialized by my
Spring Boot Application . So , the thing is , I am using SpringBoot to
initialize and load my context with all beans .

Now this is the context which I want to leverage in my spouts and bolts .

So , if what I suggested earlier does  not work on Storm Distributed
Cluster , I need to find a way of initializing my AppContext somehow:(

I would be really thankful if anyone here can help me :(


Thanks

Ankur

On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com> wrote:

> The local cluster runs completely within a single JVM AFAIK. The local
> cluster is useful for development, testing your topology, etc. The real
> deployment has to go through nimbus, run on workers started by supervisors
> on one or more nodes, etc. Kind of difficult to simulate all that on a
> single box.
>
> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:
>
>> Oh ...So I will have to test it in a cluster.
>>
>> Having said that, how is local cluster which we use is too different from
>> normal cluster.. Ideally ,it shud simulate normal cluster..
>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com> wrote:
>>
>>> Hi Ankur,
>>> local it may be working but It wont work in Actual cluster.
>>>
>>> Think about SpringContext is collection of your so many resoucres, like
>>> Database connections , may be HTTP connections , Thread pools etc.
>>> These things wont get serialised and just go to other machines and start
>>> working.
>>>
>>> SO basically in init methods of bolt and spout, you need to call some
>>> singloton class like this
>>>
>>> ApplicationContext ac = SingletonApplicationContext.getContext();
>>>
>>> SingletonApplicationContext will have a static variable
>>> ApplicationContext and in getContext you will check if static variable has
>>> been initialised if not then u will initilize it, and then return it(normal
>>> Singleton class)
>>>
>>>
>>> Now when Topolgy will move to any other node, Bolt and spouts will start
>>> and first init call will initialize it and other bolt/spouts will just use
>>> that.
>>>
>>> As John mentioned, its very important to mark all Spring beans and
>>> Context as transient.
>>>
>>> Hope it helps.
>>>
>>> Ravi.
>>>
>>>
>>>
>>>
>>>
>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com>
>>> wrote:
>>>
>>>> Hi Javier ,
>>>>
>>>> So , I am using a Local cluster on my dev machine where I am using
>>>> Eclipse . Here , I am passing Springs ApplicationContext as constructor
>>>> argument to spouts and bolts .
>>>>
>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>
>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
>>>> 10);
>>>>
>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>
>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>> 10).shuffleGrouping("mapBolt");
>>>>
>>>> Config conf = new Config();
>>>>
>>>> conf.registerSerialization(EventBean.class); /
>>>>
>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>
>>>> conf.setDebug(true);
>>>>
>>>>  LocalCluster cluster = new LocalCluster();
>>>>
>>>> cluster.submitTopology("test", conf, builder.createTopology());
>>>>
>>>>
>>>> And in my spouts and Bolts ,
>>>>
>>>> I make my Application Context variable as static  . So when it is
>>>> launched by c;uster.submitTopology , my context is still avalilable
>>>>
>>>>
>>>> private static ApplicationContext ctx;
>>>>
>>>> public RabbitListnerSpout(ApplicationContext appContext) {
>>>>
>>>> LOG.info("RabbitListner Constructor called");
>>>>
>>>> ctx = appContext;
>>>>
>>>> }
>>>>
>>>>
>>>> @SuppressWarnings("rawtypes")
>>>>
>>>> @Override
>>>>
>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector
>>>> collector) {
>>>>
>>>> LOG.info("Inside the open Method for RabbitListner Spout");
>>>>
>>>> inputManager = (InputQueueManagerImpl) ctx
>>>> .getBean(InputQueueManagerImpl.class);
>>>>
>>>> notificationManager = (NotificationQueueManagerImpl) ctx
>>>> .getBean(NotificationQueueManagerImpl.class);
>>>>
>>>> eventExchange = ctx.getEnvironment().getProperty(
>>>> "input.rabbitmq.events.exchange");
>>>>
>>>> routingKey = ctx.getEnvironment().getProperty(
>>>> "input.rabbitmq.events.routingKey");
>>>>
>>>> eventQueue = ctx.getEnvironment().getProperty(
>>>> "input.rabbitmq.events.queue");
>>>>
>>>> _collector = collector;
>>>>
>>>> LOG.info("Exiting the open Method for RabbitListner Spout");
>>>>
>>>> }
>>>>
>>>>
>>>> This is working like a charm (my ApplicationContext is initialized
>>>> seperately ) . As we all know , ApplicationContext is not serializable .
>>>> But this works well in LocalCluster.
>>>>
>>>> My assumption is that it will work in a seperate Cluster too . Is my
>>>> assumption correct ??
>>>>
>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <jagonzal@gmail.com>
>>>> wrote:
>>>>
>>>>> IIRC, only if everything you use in your spouts and bolts is
>>>>> serializable.
>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>>>>
>>>>>> Hi Ravi ,
>>>>>>
>>>>>> I was able to make an Integration with Spring but the problem is
that
>>>>>> I have to autowire for every bolt and spout . That means that even
if i
>>>>>> parallelize spout and bolt it will get started to each instance 
. Is there
>>>>>> some way that I only have to do for bolts and spouts once (I mean
if I
>>>>>> parallelize bolts or spouts individually it can share the conf from
>>>>>> somewhere) . IS this possible??
>>>>>>
>>>>>> Thanks
>>>>>> Ankur
>>>>>>
>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes this is for annotation also...
>>>>>>>
>>>>>>> you can call this method in prepare()  method of bolt and onOpen()
>>>>>>> method
>>>>>>> in every Spout and make sure you don't use any autowire bean
before
>>>>>>> this
>>>>>>> call.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Ravi.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi Ravi ,
>>>>>>> >
>>>>>>> > Thanks for your reply . I am using annotation based configuration
>>>>>>> and using
>>>>>>> > Spring Boot.
>>>>>>> >
>>>>>>> > Any idea how to do it using annotations ?
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > > Bolts and Spouts are created by Storm and not known
to Spring
>>>>>>> Context.
>>>>>>> > You
>>>>>>> > > need to manually add them to SpringContext, there are
few methods
>>>>>>> > available
>>>>>>> > > i.e.
>>>>>>> > >
>>>>>>> > >
>>>>>>> > >
>>>>>>> >
>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false);
>>>>>>> > >
>>>>>>> > > SpringContext is my own class where i have injected
>>>>>>> SpringContext so
>>>>>>> > > SpringContext.getContext() returns the actuall Spring
Context
>>>>>>> > >
>>>>>>> > >
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > Ravi.
>>>>>>> > >
>>>>>>> > >
>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg <
>>>>>>> ankurgarg9@gmail.com>
>>>>>>> > wrote:
>>>>>>> > >
>>>>>>> > > > Hi ,
>>>>>>> > > >
>>>>>>> > > > I am building a Storm topology with set of Spouts
and Bolts
>>>>>>> and also
>>>>>>> > > using
>>>>>>> > > > Spring for Dependency Injection .
>>>>>>> > > >
>>>>>>> > > > Unfortunately , none of my fields are getting
autowired even
>>>>>>> though I
>>>>>>> > > have
>>>>>>> > > > declared all my spouts and Bolts as @Components
.
>>>>>>> > > >
>>>>>>> > > > However the place where I am declaring my topology
, Spring is
>>>>>>> working
>>>>>>> > > fine
>>>>>>> > > > .
>>>>>>> > > >
>>>>>>> > > > Is it because cluster.submitTopology("test", conf,
>>>>>>> > > > builder.createTopology())
>>>>>>> > > >  submits the topology to a cluster (locally it
spawns
>>>>>>> different thread
>>>>>>> > > for
>>>>>>> > > > Spouts and Bolts) that Autowiring is not working?
>>>>>>> > > >
>>>>>>> > > > Please suggest .
>>>>>>> > > >
>>>>>>> > >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>
>
> --
> Javier González Nicolini
>

Mime
View raw message