Oh The problem here is I have many beans and which need to be initialized (some are reading conf from yml files , database connection , thread pool initialization etc) . 


Now , I have written a spring boot application which takes care of all the above and I define my topology inside one of the beans , Here is my bean 

@Autowired
ApplicationContext appContext;

@Bean
public void submitTopology() throws AlreadyAliveException,InvalidTopologyException {

   TopologyBuilder builder = new TopologyBuilder(); 

   builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), 10);

   builder.setBolt("mapBolt", new GroupingBolt(appContext), 10).shuffleGrouping("rabbitMqSpout");

builder.setBolt("reduceBolt", new PublishingBolt(appContext), 10).shuffleGrouping("mapBolt");

Config conf = new Config();

conf.registerSerialization(EventBean.class); // To be registered with Kyro for Storm

conf.registerSerialization(InputQueueManagerImpl.class);

conf.setDebug(true);

 conf.setMessageTimeoutSecs(200);

   LocalCluster cluster = new LocalCluster();

  cluster.submitTopology("test", conf, builder.createTopology());

}


When this bean is initialized , I already have appContext initialized by my Spring Boot Application . So , the thing is , I am using SpringBoot to initialize and load my context with all beans . 

Now this is the context which I want to leverage in my spouts and bolts .  

So , if what I suggested earlier does  not work on Storm Distributed Cluster , I need to find a way of initializing my AppContext somehow:(

I would be really thankful if anyone here can help me :(


Thanks

Ankur


On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com> wrote:
The local cluster runs completely within a single JVM AFAIK. The local cluster is useful for development, testing your topology, etc. The real deployment has to go through nimbus, run on workers started by supervisors on one or more nodes, etc. Kind of difficult to simulate all that on a single box.

On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:

Oh ...So I will have to test it in a cluster.

Having said that, how is local cluster which we use is too different from normal cluster.. Ideally ,it shud simulate normal cluster..

On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com> wrote:
Hi Ankur,
local it may be working but It wont work in Actual cluster.

Think about SpringContext is collection of your so many resoucres, like Database connections , may be HTTP connections , Thread pools etc.
These things wont get serialised and just go to other machines and start working.

SO basically in init methods of bolt and spout, you need to call some singloton class like this

ApplicationContext ac = SingletonApplicationContext.getContext();

SingletonApplicationContext will have a static variable ApplicationContext and in getContext you will check if static variable has been initialised if not then u will initilize it, and then return it(normal Singleton class)


Now when Topolgy will move to any other node, Bolt and spouts will start and first init call will initialize it and other bolt/spouts will just use that.

As John mentioned, its very important to mark all Spring beans and Context as transient.

Hope it helps.

Ravi.





On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com> wrote:
Hi Javier ,

So , I am using a Local cluster on my dev machine where I am using Eclipse . Here , I am passing Springs ApplicationContext as constructor argument to spouts and bolts . 

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext), 10);

builder.setBolt("mapBolt", new GroupingBolt(appContext), 10).shuffleGrouping("rabbitMqSpout");

builder.setBolt("reduceBolt", new PublishingBolt(appContext), 10).shuffleGrouping("mapBolt");

Config conf = new Config();

conf.registerSerialization(EventBean.class); /

conf.registerSerialization(InputQueueManagerImpl.class);

conf.setDebug(true);

 LocalCluster cluster = new LocalCluster();

cluster.submitTopology("test", conf, builder.createTopology());


And in my spouts and Bolts , 

I make my Application Context variable as static  . So when it is launched by c;uster.submitTopology , my context is still avalilable 


private static ApplicationContext ctx;

public RabbitListnerSpout(ApplicationContext appContext) {

LOG.info("RabbitListner Constructor called");

ctx = appContext;

}


@SuppressWarnings("rawtypes")

@Override

public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {

LOG.info("Inside the open Method for RabbitListner Spout");

inputManager = (InputQueueManagerImpl) ctx.getBean(InputQueueManagerImpl.class);

notificationManager = (NotificationQueueManagerImpl) ctx.getBean(NotificationQueueManagerImpl.class);

eventExchange = ctx.getEnvironment().getProperty("input.rabbitmq.events.exchange");

routingKey = ctx.getEnvironment().getProperty("input.rabbitmq.events.routingKey");

eventQueue = ctx.getEnvironment().getProperty("input.rabbitmq.events.queue");

_collector = collector;

LOG.info("Exiting the open Method for RabbitListner Spout");

}


This is working like a charm (my ApplicationContext is initialized seperately ) . As we all know , ApplicationContext is not serializable . But this works well in LocalCluster.

My assumption is that it will work in a seperate Cluster too . Is my assumption correct ??


On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <jagonzal@gmail.com> wrote:

IIRC, only if everything you use in your spouts and bolts is serializable.

On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
Hi Ravi ,

I was able to make an Integration with Spring but the problem is that I have to autowire for every bolt and spout . That means that even if i parallelize spout and bolt it will get started to each instance  . Is there some way that I only have to do for bolts and spouts once (I mean if I parallelize bolts or spouts individually it can share the conf from somewhere) . IS this possible??

Thanks
Ankur

On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
Yes this is for annotation also...

you can call this method in prepare()  method of bolt and onOpen() method
in every Spout and make sure you don't use any autowire bean before this
call.




Ravi.




On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:

> Hi Ravi ,
>
> Thanks for your reply . I am using annotation based configuration and using
> Spring Boot.
>
> Any idea how to do it using annotations ?
>
>
>
> On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>
> > Bolts and Spouts are created by Storm and not known to Spring Context.
> You
> > need to manually add them to SpringContext, there are few methods
> available
> > i.e.
> >
> >
> >
> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
> > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT, false);
> >
> > SpringContext is my own class where i have injected SpringContext so
> > SpringContext.getContext() returns the actuall Spring Context
> >
> >
> >
> >
> > Ravi.
> >
> >
> > On Tue, Sep 29, 2015 at 1:03 PM, Ankur Garg <ankurgarg9@gmail.com>
> wrote:
> >
> > > Hi ,
> > >
> > > I am building a Storm topology with set of Spouts and Bolts  and also
> > using
> > > Spring for Dependency Injection .
> > >
> > > Unfortunately , none of my fields are getting autowired even though I
> > have
> > > declared all my spouts and Bolts as @Components .
> > >
> > > However the place where I am declaring my topology , Spring is working
> > fine
> > > .
> > >
> > > Is it because cluster.submitTopology("test", conf,
> > > builder.createTopology())
> > >  submits the topology to a cluster (locally it spawns different thread
> > for
> > > Spouts and Bolts) that Autowiring is not working?
> > >
> > > Please suggest .
> > >
> >
>






--
Javier González Nicolini