storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Does Storm work with Spring
Date Sun, 11 Oct 2015 06:59:04 GMT
Also , I think there can be some instances of spouts/bolts running on JVM 1
and some on JVM 2 and so on...

Is it possible for spouts and bolts running on same jvm to access same
applicationContext .

I am thinking that I can make the place where I  launch my spring Boot
application  inside a singleton class , and so all the spouts and bolts
running on say JVM1 will have access to same context  (instead of launching
it in all spouts and bolts) . And for those in JVM 2 they will still
initialise it once and all the rest will get the same application Context .

But all above is theoretical assumption  . I still need to try it out
 (unfortunately i dont have a cluster setup at my end) but if possible
please let me know if this can work .

Thanks
Ankur

On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <ankurgarg9@gmail.com> wrote:

> Thanks for replying Ravi .
>
> I think your suggestion to make wrapper to read json or xml is a very nice
> Idea indeed .
>
> But , the problem for me here is to have the context (with all beans
> loaded and initialized ) available inside the Spouts and Bolts and that
> means inside every running instance of Spouts and Bolts which may be
> running on different machines and different jvm.
>
> Agree that when defining topology I dont need Spring Context as I just
> have to define spouts and bolts there.  I used context here to send them to
> spout and bolt through constructor but it appears from comments above that
> it wont work on distributed cluster .
>
> So , is there some way that once topology gets submitted to run in a
> distributed cluster , I can initialize my context there and someway they
> are available to all Spouts and Bolts  ..Basically some shared location
> where my application Context can be initialized (once and only once) and
> this context can be accessed by
> all instances of Spouts and Bolts ?
>
> Thanks
>
> On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>
>> Basically u will have two context defined at different time/phase
>>
>> When u r about to submit the topology, u need to build topology, that
>> context only need information about spouts and bolts.  You don't need any
>> application bean like database accessories or ur services etc, as at this
>> level u r not running ur application but u r just creating a topology and
>> defining how bolts and spouts are connected to each other etc etc
>>
>> Now once topology is submitted, topology will be moved to one of the
>> supervisor node and will start running, all spouts and bolts will be
>> initialized,  at this moment u will need ur application context, which
>> doesn't need ur earlier topology context
>>
>> So I will suggest keep both context separate.
>>
>> Topology is not complex to build, smaller topology can be built via code
>> only, I. E. Which bolt listening to which spout, but if u want to go with
>> good design, I say just write a small wrapper to read some json where u can
>> define ur bolts and spouts and use that to build topology (u can use spring
>> but it's not much needed)
>>
>> In past I have done it using both json setting (without spring) and xml
>> setting (with spring) both works good
>>
>> Ravi
>> On 11 Oct 2015 06:38, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>
>>> Oh The problem here is I have many beans and which need to be
>>> initialized (some are reading conf from yml files , database connection ,
>>> thread pool initialization etc) .
>>>
>>>
>>> Now , I have written a spring boot application which takes care of all
>>> the above and I define my topology inside one of the beans , Here is my
>>> bean
>>>
>>> @Autowired
>>> ApplicationContext appContext;
>>>
>>> @Bean
>>> public void submitTopology() throws
>>> AlreadyAliveException,InvalidTopologyException {
>>>
>>>    TopologyBuilder builder = new TopologyBuilder();
>>>
>>>    builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
>>> 10);
>>>
>>>    builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>> 10).shuffleGrouping("rabbitMqSpout");
>>>
>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>> 10).shuffleGrouping("mapBolt");
>>>
>>> Config conf = new Config();
>>>
>>> conf.registerSerialization(EventBean.class); // To be registered with
>>> Kyro for Storm
>>>
>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>
>>> conf.setDebug(true);
>>>
>>>  conf.setMessageTimeoutSecs(200);
>>>
>>>    LocalCluster cluster = new LocalCluster();
>>>
>>>   cluster.submitTopology("test", conf, builder.createTopology());
>>>
>>> }
>>>
>>>
>>> When this bean is initialized , I already have appContext initialized by
>>> my Spring Boot Application . So , the thing is , I am using SpringBoot to
>>> initialize and load my context with all beans .
>>>
>>> Now this is the context which I want to leverage in my spouts and bolts
>>> .
>>>
>>> So , if what I suggested earlier does  not work on Storm Distributed
>>> Cluster , I need to find a way of initializing my AppContext somehow:(
>>>
>>> I would be really thankful if anyone here can help me :(
>>>
>>>
>>> Thanks
>>>
>>> Ankur
>>>
>>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com>
>>> wrote:
>>>
>>>> The local cluster runs completely within a single JVM AFAIK. The local
>>>> cluster is useful for development, testing your topology, etc. The real
>>>> deployment has to go through nimbus, run on workers started by supervisors
>>>> on one or more nodes, etc. Kind of difficult to simulate all that on a
>>>> single box.
>>>>
>>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>> wrote:
>>>>
>>>>> Oh ...So I will have to test it in a cluster.
>>>>>
>>>>> Having said that, how is local cluster which we use is too different
>>>>> from normal cluster.. Ideally ,it shud simulate normal cluster..
>>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com> wrote:
>>>>>
>>>>>> Hi Ankur,
>>>>>> local it may be working but It wont work in Actual cluster.
>>>>>>
>>>>>> Think about SpringContext is collection of your so many resoucres,
>>>>>> like Database connections , may be HTTP connections , Thread pools
etc.
>>>>>> These things wont get serialised and just go to other machines and
>>>>>> start working.
>>>>>>
>>>>>> SO basically in init methods of bolt and spout, you need to call
some
>>>>>> singloton class like this
>>>>>>
>>>>>> ApplicationContext ac = SingletonApplicationContext.getContext();
>>>>>>
>>>>>> SingletonApplicationContext will have a static variable
>>>>>> ApplicationContext and in getContext you will check if static variable
has
>>>>>> been initialised if not then u will initilize it, and then return
it(normal
>>>>>> Singleton class)
>>>>>>
>>>>>>
>>>>>> Now when Topolgy will move to any other node, Bolt and spouts will
>>>>>> start and first init call will initialize it and other bolt/spouts
will
>>>>>> just use that.
>>>>>>
>>>>>> As John mentioned, its very important to mark all Spring beans and
>>>>>> Context as transient.
>>>>>>
>>>>>> Hope it helps.
>>>>>>
>>>>>> Ravi.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Javier ,
>>>>>>>
>>>>>>> So , I am using a Local cluster on my dev machine where I am
using
>>>>>>> Eclipse . Here , I am passing Springs ApplicationContext as constructor
>>>>>>> argument to spouts and bolts .
>>>>>>>
>>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>>>
>>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(appContext),
>>>>>>> 10);
>>>>>>>
>>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>>>
>>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>>>> 10).shuffleGrouping("mapBolt");
>>>>>>>
>>>>>>> Config conf = new Config();
>>>>>>>
>>>>>>> conf.registerSerialization(EventBean.class); /
>>>>>>>
>>>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>>>
>>>>>>> conf.setDebug(true);
>>>>>>>
>>>>>>>  LocalCluster cluster = new LocalCluster();
>>>>>>>
>>>>>>> cluster.submitTopology("test", conf, builder.createTopology());
>>>>>>>
>>>>>>>
>>>>>>> And in my spouts and Bolts ,
>>>>>>>
>>>>>>> I make my Application Context variable as static  . So when it
is
>>>>>>> launched by c;uster.submitTopology , my context is still avalilable
>>>>>>>
>>>>>>>
>>>>>>> private static ApplicationContext ctx;
>>>>>>>
>>>>>>> public RabbitListnerSpout(ApplicationContext appContext) {
>>>>>>>
>>>>>>> LOG.info("RabbitListner Constructor called");
>>>>>>>
>>>>>>> ctx = appContext;
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> @SuppressWarnings("rawtypes")
>>>>>>>
>>>>>>> @Override
>>>>>>>
>>>>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector
>>>>>>> collector) {
>>>>>>>
>>>>>>> LOG.info("Inside the open Method for RabbitListner Spout");
>>>>>>>
>>>>>>> inputManager = (InputQueueManagerImpl) ctx
>>>>>>> .getBean(InputQueueManagerImpl.class);
>>>>>>>
>>>>>>> notificationManager = (NotificationQueueManagerImpl) ctx
>>>>>>> .getBean(NotificationQueueManagerImpl.class);
>>>>>>>
>>>>>>> eventExchange = ctx.getEnvironment().getProperty(
>>>>>>> "input.rabbitmq.events.exchange");
>>>>>>>
>>>>>>> routingKey = ctx.getEnvironment().getProperty(
>>>>>>> "input.rabbitmq.events.routingKey");
>>>>>>>
>>>>>>> eventQueue = ctx.getEnvironment().getProperty(
>>>>>>> "input.rabbitmq.events.queue");
>>>>>>>
>>>>>>> _collector = collector;
>>>>>>>
>>>>>>> LOG.info("Exiting the open Method for RabbitListner Spout");
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> This is working like a charm (my ApplicationContext is initialized
>>>>>>> seperately ) . As we all know , ApplicationContext is not serializable
.
>>>>>>> But this works well in LocalCluster.
>>>>>>>
>>>>>>> My assumption is that it will work in a seperate Cluster too
. Is my
>>>>>>> assumption correct ??
>>>>>>>
>>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <jagonzal@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> IIRC, only if everything you use in your spouts and bolts
is
>>>>>>>> serializable.
>>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi Ravi ,
>>>>>>>>>
>>>>>>>>> I was able to make an Integration with Spring but the
problem is
>>>>>>>>> that I have to autowire for every bolt and spout . That
means that even if
>>>>>>>>> i parallelize spout and bolt it will get started to each
instance  . Is
>>>>>>>>> there some way that I only have to do for bolts and spouts
once (I mean if
>>>>>>>>> I parallelize bolts or spouts individually it can share
the conf from
>>>>>>>>> somewhere) . IS this possible??
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Ankur
>>>>>>>>>
>>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma <ping2ravi@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes this is for annotation also...
>>>>>>>>>>
>>>>>>>>>> you can call this method in prepare()  method of
bolt and
>>>>>>>>>> onOpen() method
>>>>>>>>>> in every Spout and make sure you don't use any autowire
bean
>>>>>>>>>> before this
>>>>>>>>>> call.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Ravi.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> > Hi Ravi ,
>>>>>>>>>> >
>>>>>>>>>> > Thanks for your reply . I am using annotation
based
>>>>>>>>>> configuration and using
>>>>>>>>>> > Spring Boot.
>>>>>>>>>> >
>>>>>>>>>> > Any idea how to do it using annotations ?
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM, Ravi Sharma
<
>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>> >
>>>>>>>>>> > > Bolts and Spouts are created by Storm and
not known to Spring
>>>>>>>>>> Context.
>>>>>>>>>> > You
>>>>>>>>>> > > need to manually add them to SpringContext,
there are few
>>>>>>>>>> methods
>>>>>>>>>> > available
>>>>>>>>>> > > i.e.
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> >
>>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT,
false);
>>>>>>>>>> > >
>>>>>>>>>> > > SpringContext is my own class where i have
injected
>>>>>>>>>> SpringContext so
>>>>>>>>>> > > SpringContext.getContext() returns the
actuall Spring Context
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > Ravi.
>>>>>>>>>> > >
>>>>>>>>>> > >
>>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03 PM, Ankur
Garg <
>>>>>>>>>> ankurgarg9@gmail.com>
>>>>>>>>>> > wrote:
>>>>>>>>>> > >
>>>>>>>>>> > > > Hi ,
>>>>>>>>>> > > >
>>>>>>>>>> > > > I am building a Storm topology with
set of Spouts and
>>>>>>>>>> Bolts  and also
>>>>>>>>>> > > using
>>>>>>>>>> > > > Spring for Dependency Injection .
>>>>>>>>>> > > >
>>>>>>>>>> > > > Unfortunately , none of my fields
are getting autowired
>>>>>>>>>> even though I
>>>>>>>>>> > > have
>>>>>>>>>> > > > declared all my spouts and Bolts as
@Components .
>>>>>>>>>> > > >
>>>>>>>>>> > > > However the place where I am declaring
my topology , Spring
>>>>>>>>>> is working
>>>>>>>>>> > > fine
>>>>>>>>>> > > > .
>>>>>>>>>> > > >
>>>>>>>>>> > > > Is it because cluster.submitTopology("test",
conf,
>>>>>>>>>> > > > builder.createTopology())
>>>>>>>>>> > > >  submits the topology to a cluster
(locally it spawns
>>>>>>>>>> different thread
>>>>>>>>>> > > for
>>>>>>>>>> > > > Spouts and Bolts) that Autowiring
is not working?
>>>>>>>>>> > > >
>>>>>>>>>> > > > Please suggest .
>>>>>>>>>> > > >
>>>>>>>>>> > >
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Javier González Nicolini
>>>>
>>>
>>>
>

Mime
View raw message