storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Does Storm work with Spring
Date Sun, 11 Oct 2015 07:31:57 GMT
I think I don't  need to Autowire beans inside my spout and bolts .

All I want my context to be available . Since I use Spring Boot , I am
delegating it to initialise all the beans and set up every bean (reading
yml file and create DB connections , connections to Message brokers etc ) .

On my local cluster I am passing it as a constructor argument to Spouts and
Bolts . Since all r running in same jvm its available to all spouts and
bolts .

But in a distributed cluster , this will blow up as Context is not
serializable and cannot be passed like above .

So the problem is only to make this context available once per jvm . Hence
I thought I will wrap it under a singleton and make this available to all
spouts and bolts per jvm.

Once I have this context initialized and loaded all I need to do is to get
the bean which I will do the same way I am doing inside local cluster
spouts and bolts .





On Sun, Oct 11, 2015 at 12:46 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:

> Yes ur assumption is right
> Jvm1 will create application contexts say ac1
>
> And jvm2 will create another application instance ac2
>
> And all of it can be done via singleton classes.
>
> All bolts and spouts in same jvm instance need to access same application
> context.
>
> I have done same in cluster and it works
>
> Remember all spring beans need to be transient and also u need to set
> required=false in case u r going create spout and bolt using spring
>
> Public class mybolt  {
> @aurowired(required=false)
> Private transient MyServiceBean myServiceBean;
>
> ....
> ...
> }
>
> Ravi
> On 11 Oct 2015 07:59, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>
>> Also , I think there can be some instances of spouts/bolts running on JVM
>> 1 and some on JVM 2 and so on...
>>
>> Is it possible for spouts and bolts running on same jvm to access same
>> applicationContext .
>>
>> I am thinking that I can make the place where I  launch my spring Boot
>> application  inside a singleton class , and so all the spouts and bolts
>> running on say JVM1 will have access to same context  (instead of launching
>> it in all spouts and bolts) . And for those in JVM 2 they will still
>> initialise it once and all the rest will get the same application Context .
>>
>> But all above is theoretical assumption  . I still need to try it out
>>  (unfortunately i dont have a cluster setup at my end) but if possible
>> please let me know if this can work .
>>
>> Thanks
>> Ankur
>>
>> On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <ankurgarg9@gmail.com>
>> wrote:
>>
>>> Thanks for replying Ravi .
>>>
>>> I think your suggestion to make wrapper to read json or xml is a very
>>> nice Idea indeed .
>>>
>>> But , the problem for me here is to have the context (with all beans
>>> loaded and initialized ) available inside the Spouts and Bolts and that
>>> means inside every running instance of Spouts and Bolts which may be
>>> running on different machines and different jvm.
>>>
>>> Agree that when defining topology I dont need Spring Context as I just
>>> have to define spouts and bolts there.  I used context here to send them to
>>> spout and bolt through constructor but it appears from comments above that
>>> it wont work on distributed cluster .
>>>
>>> So , is there some way that once topology gets submitted to run in a
>>> distributed cluster , I can initialize my context there and someway they
>>> are available to all Spouts and Bolts  ..Basically some shared location
>>> where my application Context can be initialized (once and only once) and
>>> this context can be accessed by
>>> all instances of Spouts and Bolts ?
>>>
>>> Thanks
>>>
>>> On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <ping2ravi@gmail.com>
>>> wrote:
>>>
>>>> Basically u will have two context defined at different time/phase
>>>>
>>>> When u r about to submit the topology, u need to build topology, that
>>>> context only need information about spouts and bolts.  You don't need any
>>>> application bean like database accessories or ur services etc, as at this
>>>> level u r not running ur application but u r just creating a topology and
>>>> defining how bolts and spouts are connected to each other etc etc
>>>>
>>>> Now once topology is submitted, topology will be moved to one of the
>>>> supervisor node and will start running, all spouts and bolts will be
>>>> initialized,  at this moment u will need ur application context, which
>>>> doesn't need ur earlier topology context
>>>>
>>>> So I will suggest keep both context separate.
>>>>
>>>> Topology is not complex to build, smaller topology can be built via
>>>> code only, I. E. Which bolt listening to which spout, but if u want to go
>>>> with good design, I say just write a small wrapper to read some json where
>>>> u can define ur bolts and spouts and use that to build topology (u can use
>>>> spring but it's not much needed)
>>>>
>>>> In past I have done it using both json setting (without spring) and xml
>>>> setting (with spring) both works good
>>>>
>>>> Ravi
>>>> On 11 Oct 2015 06:38, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>>>
>>>>> Oh The problem here is I have many beans and which need to be
>>>>> initialized (some are reading conf from yml files , database connection
,
>>>>> thread pool initialization etc) .
>>>>>
>>>>>
>>>>> Now , I have written a spring boot application which takes care of all
>>>>> the above and I define my topology inside one of the beans , Here is
my
>>>>> bean
>>>>>
>>>>> @Autowired
>>>>> ApplicationContext appContext;
>>>>>
>>>>> @Bean
>>>>> public void submitTopology() throws
>>>>> AlreadyAliveException,InvalidTopologyException {
>>>>>
>>>>>    TopologyBuilder builder = new TopologyBuilder();
>>>>>
>>>>>    builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
>>>>> 10);
>>>>>
>>>>>    builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>
>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>> 10).shuffleGrouping("mapBolt");
>>>>>
>>>>> Config conf = new Config();
>>>>>
>>>>> conf.registerSerialization(EventBean.class); // To be registered with
>>>>> Kyro for Storm
>>>>>
>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>
>>>>> conf.setDebug(true);
>>>>>
>>>>>  conf.setMessageTimeoutSecs(200);
>>>>>
>>>>>    LocalCluster cluster = new LocalCluster();
>>>>>
>>>>>   cluster.submitTopology("test", conf, builder.createTopology());
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> When this bean is initialized , I already have appContext initialized
>>>>> by my Spring Boot Application . So , the thing is , I am using SpringBoot
>>>>> to initialize and load my context with all beans .
>>>>>
>>>>> Now this is the context which I want to leverage in my spouts and
>>>>> bolts .
>>>>>
>>>>> So , if what I suggested earlier does  not work on Storm Distributed
>>>>> Cluster , I need to find a way of initializing my AppContext somehow:(
>>>>>
>>>>> I would be really thankful if anyone here can help me :(
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> Ankur
>>>>>
>>>>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The local cluster runs completely within a single JVM AFAIK. The
>>>>>> local cluster is useful for development, testing your topology, etc.
The
>>>>>> real deployment has to go through nimbus, run on workers started
by
>>>>>> supervisors on one or more nodes, etc. Kind of difficult to simulate
all
>>>>>> that on a single box.
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Oh ...So I will have to test it in a cluster.
>>>>>>>
>>>>>>> Having said that, how is local cluster which we use is too different
>>>>>>> from normal cluster.. Ideally ,it shud simulate normal cluster..
>>>>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com>
wrote:
>>>>>>>
>>>>>>>> Hi Ankur,
>>>>>>>> local it may be working but It wont work in Actual cluster.
>>>>>>>>
>>>>>>>> Think about SpringContext is collection of your so many resoucres,
>>>>>>>> like Database connections , may be HTTP connections , Thread
pools etc.
>>>>>>>> These things wont get serialised and just go to other machines
and
>>>>>>>> start working.
>>>>>>>>
>>>>>>>> SO basically in init methods of bolt and spout, you need
to call
>>>>>>>> some singloton class like this
>>>>>>>>
>>>>>>>> ApplicationContext ac = SingletonApplicationContext.getContext();
>>>>>>>>
>>>>>>>> SingletonApplicationContext will have a static variable
>>>>>>>> ApplicationContext and in getContext you will check if static
variable has
>>>>>>>> been initialised if not then u will initilize it, and then
return it(normal
>>>>>>>> Singleton class)
>>>>>>>>
>>>>>>>>
>>>>>>>> Now when Topolgy will move to any other node, Bolt and spouts
will
>>>>>>>> start and first init call will initialize it and other bolt/spouts
will
>>>>>>>> just use that.
>>>>>>>>
>>>>>>>> As John mentioned, its very important to mark all Spring
beans and
>>>>>>>> Context as transient.
>>>>>>>>
>>>>>>>> Hope it helps.
>>>>>>>>
>>>>>>>> Ravi.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Javier ,
>>>>>>>>>
>>>>>>>>> So , I am using a Local cluster on my dev machine where
I am using
>>>>>>>>> Eclipse . Here , I am passing Springs ApplicationContext
as constructor
>>>>>>>>> argument to spouts and bolts .
>>>>>>>>>
>>>>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>>>>>
>>>>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(
>>>>>>>>> appContext), 10);
>>>>>>>>>
>>>>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>>>>>
>>>>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>>>>>> 10).shuffleGrouping("mapBolt");
>>>>>>>>>
>>>>>>>>> Config conf = new Config();
>>>>>>>>>
>>>>>>>>> conf.registerSerialization(EventBean.class); /
>>>>>>>>>
>>>>>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>>>>>
>>>>>>>>> conf.setDebug(true);
>>>>>>>>>
>>>>>>>>>  LocalCluster cluster = new LocalCluster();
>>>>>>>>>
>>>>>>>>> cluster.submitTopology("test", conf, builder.createTopology());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> And in my spouts and Bolts ,
>>>>>>>>>
>>>>>>>>> I make my Application Context variable as static  . So
when it is
>>>>>>>>> launched by c;uster.submitTopology , my context is still
avalilable
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> private static ApplicationContext ctx;
>>>>>>>>>
>>>>>>>>> public RabbitListnerSpout(ApplicationContext appContext)
{
>>>>>>>>>
>>>>>>>>> LOG.info("RabbitListner Constructor called");
>>>>>>>>>
>>>>>>>>> ctx = appContext;
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @SuppressWarnings("rawtypes")
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>>
>>>>>>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector
>>>>>>>>> collector) {
>>>>>>>>>
>>>>>>>>> LOG.info("Inside the open Method for RabbitListner Spout");
>>>>>>>>>
>>>>>>>>> inputManager = (InputQueueManagerImpl) ctx
>>>>>>>>> .getBean(InputQueueManagerImpl.class);
>>>>>>>>>
>>>>>>>>> notificationManager = (NotificationQueueManagerImpl)
ctx
>>>>>>>>> .getBean(NotificationQueueManagerImpl.class);
>>>>>>>>>
>>>>>>>>> eventExchange = ctx.getEnvironment().getProperty(
>>>>>>>>> "input.rabbitmq.events.exchange");
>>>>>>>>>
>>>>>>>>> routingKey = ctx.getEnvironment().getProperty(
>>>>>>>>> "input.rabbitmq.events.routingKey");
>>>>>>>>>
>>>>>>>>> eventQueue = ctx.getEnvironment().getProperty(
>>>>>>>>> "input.rabbitmq.events.queue");
>>>>>>>>>
>>>>>>>>> _collector = collector;
>>>>>>>>>
>>>>>>>>> LOG.info("Exiting the open Method for RabbitListner Spout");
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This is working like a charm (my ApplicationContext is
initialized
>>>>>>>>> seperately ) . As we all know , ApplicationContext is
not serializable .
>>>>>>>>> But this works well in LocalCluster.
>>>>>>>>>
>>>>>>>>> My assumption is that it will work in a seperate Cluster
too . Is
>>>>>>>>> my assumption correct ??
>>>>>>>>>
>>>>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <
>>>>>>>>> jagonzal@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> IIRC, only if everything you use in your spouts and
bolts is
>>>>>>>>>> serializable.
>>>>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ravi ,
>>>>>>>>>>>
>>>>>>>>>>> I was able to make an Integration with Spring
but the problem is
>>>>>>>>>>> that I have to autowire for every bolt and spout
. That means that even if
>>>>>>>>>>> i parallelize spout and bolt it will get started
to each instance  . Is
>>>>>>>>>>> there some way that I only have to do for bolts
and spouts once (I mean if
>>>>>>>>>>> I parallelize bolts or spouts individually it
can share the conf from
>>>>>>>>>>> somewhere) . IS this possible??
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Ankur
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma
<
>>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes this is for annotation also...
>>>>>>>>>>>>
>>>>>>>>>>>> you can call this method in prepare()  method
of bolt and
>>>>>>>>>>>> onOpen() method
>>>>>>>>>>>> in every Spout and make sure you don't use
any autowire bean
>>>>>>>>>>>> before this
>>>>>>>>>>>> call.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Ravi.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg
<
>>>>>>>>>>>> ankurgarg9@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> > Hi Ravi ,
>>>>>>>>>>>> >
>>>>>>>>>>>> > Thanks for your reply . I am using annotation
based
>>>>>>>>>>>> configuration and using
>>>>>>>>>>>> > Spring Boot.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Any idea how to do it using annotations
?
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi
Sharma <
>>>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> > > Bolts and Spouts are created by
Storm and not known to
>>>>>>>>>>>> Spring Context.
>>>>>>>>>>>> > You
>>>>>>>>>>>> > > need to manually add them to SpringContext,
there are few
>>>>>>>>>>>> methods
>>>>>>>>>>>> > available
>>>>>>>>>>>> > > i.e.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> >
>>>>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT,
false);
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > SpringContext is my own class where
i have injected
>>>>>>>>>>>> SpringContext so
>>>>>>>>>>>> > > SpringContext.getContext() returns
the actuall Spring
>>>>>>>>>>>> Context
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Ravi.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM,
Ankur Garg <
>>>>>>>>>>>> ankurgarg9@gmail.com>
>>>>>>>>>>>> > wrote:
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > > Hi ,
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > I am building a Storm topology
with set of Spouts and
>>>>>>>>>>>> Bolts  and also
>>>>>>>>>>>> > > using
>>>>>>>>>>>> > > > Spring for Dependency Injection
.
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > Unfortunately , none of my
fields are getting autowired
>>>>>>>>>>>> even though I
>>>>>>>>>>>> > > have
>>>>>>>>>>>> > > > declared all my spouts and
Bolts as @Components .
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > However the place where I
am declaring my topology ,
>>>>>>>>>>>> Spring is working
>>>>>>>>>>>> > > fine
>>>>>>>>>>>> > > > .
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > Is it because cluster.submitTopology("test",
conf,
>>>>>>>>>>>> > > > builder.createTopology())
>>>>>>>>>>>> > > >  submits the topology to a
cluster (locally it spawns
>>>>>>>>>>>> different thread
>>>>>>>>>>>> > > for
>>>>>>>>>>>> > > > Spouts and Bolts) that Autowiring
is not working?
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > Please suggest .
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Javier González Nicolini
>>>>>>
>>>>>
>>>>>
>>>
>>

Mime
View raw message