storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ankur Garg <ankurga...@gmail.com>
Subject Re: Does Storm work with Spring
Date Mon, 19 Oct 2015 14:25:27 GMT
Hi Ravi ,

Need your help . So I created a local cluster and deployed my topology to
it . Inside my Spout and Bolts , I am launching a Spring Boot application
wrapped inside a singleton to initialise my context . Unfortunately , it
appears to me that it is not working :((((  and annotations like
@EnableAutoConfiguration is not picking up yml files from the classpath and
injecting their values in the bean. And I am getting exceptions like

Error creating bean with name 'inputQueueManager': Injection of autowired
dependencies failed; nested exception is
org.springframework.beans.factory.BeanCreationException: Could not autowire
field: private int
mqclient.rabbitmq.manager.impl.InputQueueManagerImpl.rabbitMqPort; nested
exception is org.springframework.beans.TypeMismatchException: Failed to
convert value of type 'java.lang.String' to required type 'int'; nested
exception is java.lang.NumberFormatException: For input string:
"${input.rabbitmq.port}" at

has anyone here ever tried injecting dependencies from Spring . I am not
sure why this is not working .

It works like a charm in Local Cluster and now I am not passing context as
a constructor argument , rather declaring and initializing it inside each
spout and bolts :( .

Is there any reason why Spring Annotations dont work inside a Remote
Cluster .

Need help urgently here .

Thanks
Ankur

On Sun, Oct 11, 2015 at 1:01 PM, Ankur Garg <ankurgarg9@gmail.com> wrote:

> I think I don't  need to Autowire beans inside my spout and bolts .
>
> All I want my context to be available . Since I use Spring Boot , I am
> delegating it to initialise all the beans and set up every bean (reading
> yml file and create DB connections , connections to Message brokers etc ) .
>
> On my local cluster I am passing it as a constructor argument to Spouts
> and Bolts . Since all r running in same jvm its available to all spouts and
> bolts .
>
> But in a distributed cluster , this will blow up as Context is not
> serializable and cannot be passed like above .
>
> So the problem is only to make this context available once per jvm . Hence
> I thought I will wrap it under a singleton and make this available to all
> spouts and bolts per jvm.
>
> Once I have this context initialized and loaded all I need to do is to get
> the bean which I will do the same way I am doing inside local cluster
> spouts and bolts .
>
>
>
>
>
> On Sun, Oct 11, 2015 at 12:46 PM, Ravi Sharma <ping2ravi@gmail.com> wrote:
>
>> Yes ur assumption is right
>> Jvm1 will create application contexts say ac1
>>
>> And jvm2 will create another application instance ac2
>>
>> And all of it can be done via singleton classes.
>>
>> All bolts and spouts in same jvm instance need to access same application
>> context.
>>
>> I have done same in cluster and it works
>>
>> Remember all spring beans need to be transient and also u need to set
>> required=false in case u r going create spout and bolt using spring
>>
>> Public class mybolt  {
>> @aurowired(required=false)
>> Private transient MyServiceBean myServiceBean;
>>
>> ....
>> ...
>> }
>>
>> Ravi
>> On 11 Oct 2015 07:59, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>
>>> Also , I think there can be some instances of spouts/bolts running on
>>> JVM 1 and some on JVM 2 and so on...
>>>
>>> Is it possible for spouts and bolts running on same jvm to access same
>>> applicationContext .
>>>
>>> I am thinking that I can make the place where I  launch my spring Boot
>>> application  inside a singleton class , and so all the spouts and bolts
>>> running on say JVM1 will have access to same context  (instead of launching
>>> it in all spouts and bolts) . And for those in JVM 2 they will still
>>> initialise it once and all the rest will get the same application Context .
>>>
>>> But all above is theoretical assumption  . I still need to try it out
>>>  (unfortunately i dont have a cluster setup at my end) but if possible
>>> please let me know if this can work .
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Sun, Oct 11, 2015 at 11:48 AM, Ankur Garg <ankurgarg9@gmail.com>
>>> wrote:
>>>
>>>> Thanks for replying Ravi .
>>>>
>>>> I think your suggestion to make wrapper to read json or xml is a very
>>>> nice Idea indeed .
>>>>
>>>> But , the problem for me here is to have the context (with all beans
>>>> loaded and initialized ) available inside the Spouts and Bolts and that
>>>> means inside every running instance of Spouts and Bolts which may be
>>>> running on different machines and different jvm.
>>>>
>>>> Agree that when defining topology I dont need Spring Context as I just
>>>> have to define spouts and bolts there.  I used context here to send them
to
>>>> spout and bolt through constructor but it appears from comments above that
>>>> it wont work on distributed cluster .
>>>>
>>>> So , is there some way that once topology gets submitted to run in a
>>>> distributed cluster , I can initialize my context there and someway they
>>>> are available to all Spouts and Bolts  ..Basically some shared location
>>>> where my application Context can be initialized (once and only once) and
>>>> this context can be accessed by
>>>> all instances of Spouts and Bolts ?
>>>>
>>>> Thanks
>>>>
>>>> On Sun, Oct 11, 2015 at 11:20 AM, Ravi Sharma <ping2ravi@gmail.com>
>>>> wrote:
>>>>
>>>>> Basically u will have two context defined at different time/phase
>>>>>
>>>>> When u r about to submit the topology, u need to build topology, that
>>>>> context only need information about spouts and bolts.  You don't need
any
>>>>> application bean like database accessories or ur services etc, as at
this
>>>>> level u r not running ur application but u r just creating a topology
and
>>>>> defining how bolts and spouts are connected to each other etc etc
>>>>>
>>>>> Now once topology is submitted, topology will be moved to one of the
>>>>> supervisor node and will start running, all spouts and bolts will be
>>>>> initialized,  at this moment u will need ur application context, which
>>>>> doesn't need ur earlier topology context
>>>>>
>>>>> So I will suggest keep both context separate.
>>>>>
>>>>> Topology is not complex to build, smaller topology can be built via
>>>>> code only, I. E. Which bolt listening to which spout, but if u want to
go
>>>>> with good design, I say just write a small wrapper to read some json
where
>>>>> u can define ur bolts and spouts and use that to build topology (u can
use
>>>>> spring but it's not much needed)
>>>>>
>>>>> In past I have done it using both json setting (without spring) and
>>>>> xml setting (with spring) both works good
>>>>>
>>>>> Ravi
>>>>> On 11 Oct 2015 06:38, "Ankur Garg" <ankurgarg9@gmail.com> wrote:
>>>>>
>>>>>> Oh The problem here is I have many beans and which need to be
>>>>>> initialized (some are reading conf from yml files , database connection
,
>>>>>> thread pool initialization etc) .
>>>>>>
>>>>>>
>>>>>> Now , I have written a spring boot application which takes care of
>>>>>> all the above and I define my topology inside one of the beans ,
Here is my
>>>>>> bean
>>>>>>
>>>>>> @Autowired
>>>>>> ApplicationContext appContext;
>>>>>>
>>>>>> @Bean
>>>>>> public void submitTopology() throws
>>>>>> AlreadyAliveException,InvalidTopologyException {
>>>>>>
>>>>>>    TopologyBuilder builder = new TopologyBuilder();
>>>>>>
>>>>>>    builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(
>>>>>> appContext), 10);
>>>>>>
>>>>>>    builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>>
>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>>> 10).shuffleGrouping("mapBolt");
>>>>>>
>>>>>> Config conf = new Config();
>>>>>>
>>>>>> conf.registerSerialization(EventBean.class); // To be registered
>>>>>> with Kyro for Storm
>>>>>>
>>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>>
>>>>>> conf.setDebug(true);
>>>>>>
>>>>>>  conf.setMessageTimeoutSecs(200);
>>>>>>
>>>>>>    LocalCluster cluster = new LocalCluster();
>>>>>>
>>>>>>   cluster.submitTopology("test", conf, builder.createTopology());
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> When this bean is initialized , I already have appContext initialized
>>>>>> by my Spring Boot Application . So , the thing is , I am using SpringBoot
>>>>>> to initialize and load my context with all beans .
>>>>>>
>>>>>> Now this is the context which I want to leverage in my spouts and
>>>>>> bolts .
>>>>>>
>>>>>> So , if what I suggested earlier does  not work on Storm Distributed
>>>>>> Cluster , I need to find a way of initializing my AppContext somehow:(
>>>>>>
>>>>>> I would be really thankful if anyone here can help me :(
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Ankur
>>>>>>
>>>>>> On Sun, Oct 11, 2015 at 5:54 AM, Javier Gonzalez <jagonzal@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The local cluster runs completely within a single JVM AFAIK.
The
>>>>>>> local cluster is useful for development, testing your topology,
etc. The
>>>>>>> real deployment has to go through nimbus, run on workers started
by
>>>>>>> supervisors on one or more nodes, etc. Kind of difficult to simulate
all
>>>>>>> that on a single box.
>>>>>>>
>>>>>>> On Sat, Oct 10, 2015 at 1:45 PM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Oh ...So I will have to test it in a cluster.
>>>>>>>>
>>>>>>>> Having said that, how is local cluster which we use is too
>>>>>>>> different from normal cluster.. Ideally ,it shud simulate
normal cluster..
>>>>>>>> On Oct 10, 2015 7:51 PM, "Ravi Sharma" <ping2ravi@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi Ankur,
>>>>>>>>> local it may be working but It wont work in Actual cluster.
>>>>>>>>>
>>>>>>>>> Think about SpringContext is collection of your so many
resoucres,
>>>>>>>>> like Database connections , may be HTTP connections ,
Thread pools etc.
>>>>>>>>> These things wont get serialised and just go to other
machines and
>>>>>>>>> start working.
>>>>>>>>>
>>>>>>>>> SO basically in init methods of bolt and spout, you need
to call
>>>>>>>>> some singloton class like this
>>>>>>>>>
>>>>>>>>> ApplicationContext ac = SingletonApplicationContext.getContext();
>>>>>>>>>
>>>>>>>>> SingletonApplicationContext will have a static variable
>>>>>>>>> ApplicationContext and in getContext you will check if
static variable has
>>>>>>>>> been initialised if not then u will initilize it, and
then return it(normal
>>>>>>>>> Singleton class)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Now when Topolgy will move to any other node, Bolt and
spouts will
>>>>>>>>> start and first init call will initialize it and other
bolt/spouts will
>>>>>>>>> just use that.
>>>>>>>>>
>>>>>>>>> As John mentioned, its very important to mark all Spring
beans and
>>>>>>>>> Context as transient.
>>>>>>>>>
>>>>>>>>> Hope it helps.
>>>>>>>>>
>>>>>>>>> Ravi.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Oct 10, 2015 at 6:25 AM, Ankur Garg <ankurgarg9@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Javier ,
>>>>>>>>>>
>>>>>>>>>> So , I am using a Local cluster on my dev machine
where I am
>>>>>>>>>> using Eclipse . Here , I am passing Springs ApplicationContext
as
>>>>>>>>>> constructor argument to spouts and bolts .
>>>>>>>>>>
>>>>>>>>>> TopologyBuilder builder = new TopologyBuilder();
>>>>>>>>>>
>>>>>>>>>> builder.setSpout("rabbitMqSpout", new RabbitListnerSpout(
>>>>>>>>>> appContext), 10);
>>>>>>>>>>
>>>>>>>>>> builder.setBolt("mapBolt", new GroupingBolt(appContext),
>>>>>>>>>> 10).shuffleGrouping("rabbitMqSpout");
>>>>>>>>>>
>>>>>>>>>> builder.setBolt("reduceBolt", new PublishingBolt(appContext),
>>>>>>>>>> 10).shuffleGrouping("mapBolt");
>>>>>>>>>>
>>>>>>>>>> Config conf = new Config();
>>>>>>>>>>
>>>>>>>>>> conf.registerSerialization(EventBean.class); /
>>>>>>>>>>
>>>>>>>>>> conf.registerSerialization(InputQueueManagerImpl.class);
>>>>>>>>>>
>>>>>>>>>> conf.setDebug(true);
>>>>>>>>>>
>>>>>>>>>>  LocalCluster cluster = new LocalCluster();
>>>>>>>>>>
>>>>>>>>>> cluster.submitTopology("test", conf, builder.createTopology());
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> And in my spouts and Bolts ,
>>>>>>>>>>
>>>>>>>>>> I make my Application Context variable as static
 . So when it is
>>>>>>>>>> launched by c;uster.submitTopology , my context is
still avalilable
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> private static ApplicationContext ctx;
>>>>>>>>>>
>>>>>>>>>> public RabbitListnerSpout(ApplicationContext appContext)
{
>>>>>>>>>>
>>>>>>>>>> LOG.info("RabbitListner Constructor called");
>>>>>>>>>>
>>>>>>>>>> ctx = appContext;
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> @SuppressWarnings("rawtypes")
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>>
>>>>>>>>>> public void open(Map conf, TopologyContext context,SpoutOutputCollector
>>>>>>>>>> collector) {
>>>>>>>>>>
>>>>>>>>>> LOG.info("Inside the open Method for RabbitListner
Spout");
>>>>>>>>>>
>>>>>>>>>> inputManager = (InputQueueManagerImpl) ctx
>>>>>>>>>> .getBean(InputQueueManagerImpl.class);
>>>>>>>>>>
>>>>>>>>>> notificationManager = (NotificationQueueManagerImpl)
ctx
>>>>>>>>>> .getBean(NotificationQueueManagerImpl.class);
>>>>>>>>>>
>>>>>>>>>> eventExchange = ctx.getEnvironment().getProperty(
>>>>>>>>>> "input.rabbitmq.events.exchange");
>>>>>>>>>>
>>>>>>>>>> routingKey = ctx.getEnvironment().getProperty(
>>>>>>>>>> "input.rabbitmq.events.routingKey");
>>>>>>>>>>
>>>>>>>>>> eventQueue = ctx.getEnvironment().getProperty(
>>>>>>>>>> "input.rabbitmq.events.queue");
>>>>>>>>>>
>>>>>>>>>> _collector = collector;
>>>>>>>>>>
>>>>>>>>>> LOG.info("Exiting the open Method for RabbitListner
Spout");
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is working like a charm (my ApplicationContext
is
>>>>>>>>>> initialized seperately ) . As we all know , ApplicationContext
is not
>>>>>>>>>> serializable . But this works well in LocalCluster.
>>>>>>>>>>
>>>>>>>>>> My assumption is that it will work in a seperate
Cluster too . Is
>>>>>>>>>> my assumption correct ??
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 9, 2015 at 9:04 PM, Javier Gonzalez <
>>>>>>>>>> jagonzal@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> IIRC, only if everything you use in your spouts
and bolts is
>>>>>>>>>>> serializable.
>>>>>>>>>>> On Oct 6, 2015 11:29 PM, "Ankur Garg" <ankurgarg9@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Ravi ,
>>>>>>>>>>>>
>>>>>>>>>>>> I was able to make an Integration with Spring
but the problem
>>>>>>>>>>>> is that I have to autowire for every bolt
and spout . That means that even
>>>>>>>>>>>> if i parallelize spout and bolt it will get
started to each instance  . Is
>>>>>>>>>>>> there some way that I only have to do for
bolts and spouts once (I mean if
>>>>>>>>>>>> I parallelize bolts or spouts individually
it can share the conf from
>>>>>>>>>>>> somewhere) . IS this possible??
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Ankur
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 29, 2015 at 7:57 PM, Ravi Sharma
<
>>>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Yes this is for annotation also...
>>>>>>>>>>>>>
>>>>>>>>>>>>> you can call this method in prepare()
 method of bolt and
>>>>>>>>>>>>> onOpen() method
>>>>>>>>>>>>> in every Spout and make sure you don't
use any autowire bean
>>>>>>>>>>>>> before this
>>>>>>>>>>>>> call.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Ravi.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Sep 29, 2015 at 2:22 PM, Ankur
Garg <
>>>>>>>>>>>>> ankurgarg9@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > Hi Ravi ,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thanks for your reply . I am using
annotation based
>>>>>>>>>>>>> configuration and using
>>>>>>>>>>>>> > Spring Boot.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Any idea how to do it using annotations
?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Tue, Sep 29, 2015 at 6:41 PM,
Ravi Sharma <
>>>>>>>>>>>>> ping2ravi@gmail.com> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > > Bolts and Spouts are created
by Storm and not known to
>>>>>>>>>>>>> Spring Context.
>>>>>>>>>>>>> > You
>>>>>>>>>>>>> > > need to manually add them to
SpringContext, there are few
>>>>>>>>>>>>> methods
>>>>>>>>>>>>> > available
>>>>>>>>>>>>> > > i.e.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> SpringContext.getContext().getAutowireCapableBeanFactory().autowireBeanProperties(this,
>>>>>>>>>>>>> > > AutowireCapableBeanFactory.AUTOWIRE_AUTODETECT,
false);
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > SpringContext is my own class
where i have injected
>>>>>>>>>>>>> SpringContext so
>>>>>>>>>>>>> > > SpringContext.getContext()
returns the actuall Spring
>>>>>>>>>>>>> Context
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > Ravi.
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > On Tue, Sep 29, 2015 at 1:03
PM, Ankur Garg <
>>>>>>>>>>>>> ankurgarg9@gmail.com>
>>>>>>>>>>>>> > wrote:
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> > > > Hi ,
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > I am building a Storm
topology with set of Spouts and
>>>>>>>>>>>>> Bolts  and also
>>>>>>>>>>>>> > > using
>>>>>>>>>>>>> > > > Spring for Dependency
Injection .
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > Unfortunately , none of
my fields are getting autowired
>>>>>>>>>>>>> even though I
>>>>>>>>>>>>> > > have
>>>>>>>>>>>>> > > > declared all my spouts
and Bolts as @Components .
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > However the place where
I am declaring my topology ,
>>>>>>>>>>>>> Spring is working
>>>>>>>>>>>>> > > fine
>>>>>>>>>>>>> > > > .
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > Is it because cluster.submitTopology("test",
conf,
>>>>>>>>>>>>> > > > builder.createTopology())
>>>>>>>>>>>>> > > >  submits the topology
to a cluster (locally it spawns
>>>>>>>>>>>>> different thread
>>>>>>>>>>>>> > > for
>>>>>>>>>>>>> > > > Spouts and Bolts) that
Autowiring is not working?
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > > > Please suggest .
>>>>>>>>>>>>> > > >
>>>>>>>>>>>>> > >
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Javier González Nicolini
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>

Mime
View raw message