storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Navin Ipe <navin....@searchlighthealth.com>
Subject Re: If tuples come in too fast for bolts to process, does Zookeeper keep it in a queue?
Date Fri, 06 May 2016 11:26:50 GMT
Aha! I feel these kind of concepts should be mentioned somewhere in the
official documentation or on something like a "mistakes to avoid" page. Or
perhaps I'll put it up on my blog.

On Fri, May 6, 2016 at 2:52 PM, Spico Florin <spicoflorin@gmail.com> wrote:

> Hi!
>   You welcome. nextTuple and the ack method are called in the same thread
> by the framework. So if you have  heavy computation in the next tuple, your
> ack method will never be called and the buffers that are responsible for
> receiving the ack messages will not be emptied. The nextTuple acts as
> producer for the these buffers while ack as a consumer.
> I hope that these help.
>  Regards,
>  Florin
>
> On Fri, May 6, 2016 at 11:25 AM, Navin Ipe <
> navin.ipe@searchlighthealth.com> wrote:
>
>> Thanks Florin. It does indeed seem to be a memory problem. Turns out that
>> there were no ack's happening either because I was emitting from a while
>> loop in nextTuple() and it never left the nextTuple() function.
>>
>> On Fri, May 6, 2016 at 11:59 AM, Spico Florin <spicoflorin@gmail.com>
>> wrote:
>>
>>> Hello!
>>>   If you have a look at the last line of your log you can see:
>>> java.lang.OutOfMemoryError: *GC overhead limit exceeded*
>>>  So you don't have enough memory for your worker. This is the reason
>>> that the connection of the worker to ZoooKeper dies. The worker sends
>>> heartbeats to ZK. If worker dies then no heartbeat to ZK. Therefore you
>>> have connection timeout.
>>> You can increase the JVM memory by setting up this via Config
>>> property topology.worker.childopts . Config *conf.setProperty("*
>>> * topology.worker.childopts**", "-Xms1G -Xmx1G").* This is to set up
>>> you JVM heap memory.
>>>
>>>
>>> To answer your question: *Does Zookeeper store a queue of unprocessed
>>> tuples until the Bolts are ready to process them?*
>>> *No.* Storm has internal queues to buffer the tuples. It is using a
>>> LMAX disruptor queues to send/receive tuples from collocated JVM executors
>>> (spouts and bolts) and different incoming/outgoing queues for
>>> receiving/sending tuples to external workers (JVM).
>>> Please have a detailed description here.
>>> http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/
>>>
>>> I hope that these help.
>>>  Regards,
>>>  Florin
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, May 6, 2016 at 8:22 AM, Navin Ipe <
>>> navin.ipe@searchlighthealth.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a topology where if a spout emits 1 tuple, a Bolt-A takes that
>>>> tuple and emits 20 tuples. The next Bolt-B takes Bolt-A's tuples and emits
>>>> 50 more tuples for each of Bolt-A's tuples. Tuples are always anchored.
>>>>
>>>> *Question:*
>>>> When a light-weight spout emits a few tuples and Bolt-B has to process
>>>> an exponential number of tuples, Bolt-A and B will receive tuples faster
>>>> than they can process it. Does Zookeeper store a queue of unprocessed
>>>> tuples until the Bolts are ready to process them?
>>>>
>>>> *Reason I'm asking:*
>>>> Because I get a session timeout when I run a single instance of the
>>>> bolts. When I increase the parallelism and tasks to 5, it runs longer
>>>> before timing out. When I increase it to 15, it runs even longer before
>>>> timing out.
>>>>
>>>> *The various error messages:*
>>>> 587485 [main-SendThread(localhost:2001)] INFO  o.a.s.s.o.a.z.ClientCnxn
>>>> - Client session timed out, have not heard from server in 13645ms for
>>>> sessionid 0x154846bbee00003, closing socket connection and attempting
>>>> reconnect
>>>>
>>>> 599655 [main-EventThread] INFO
>>>> o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: SUSPENDED
>>>>
>>>> 614868 [main-SendThread(localhost:2001)] INFO  o.a.s.s.o.a.z.ClientCnxn
>>>> - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2001. Will
>>>> not attempt to authenticate using SASL (unknown error)
>>>>
>>>> 614869 [main-SendThread(localhost:2001)] INFO  o.a.s.s.o.a.z.ClientCnxn
>>>> - Socket connection established to localhost/0:0:0:0:0:0:0:1:2001,
>>>> initiating session
>>>>
>>>> 607952 [SessionTracker] INFO  o.a.s.s.o.a.z.s.ZooKeeperServer -
>>>> Expiring session 0x154846bbee00003, timeout of 20000ms exceeded
>>>>
>>>> 621928 [main-EventThread] WARN  o.a.s.c.zookeeper-state-factory -
>>>> Received event :disconnected::none: with disconnected Writer Zookeeper.
>>>>
>>>> 627967 [Curator-Framework-0] WARN  o.a.s.s.o.a.c.ConnectionState -
>>>> Connection attempt unsuccessful after 25535 (greater than max timeout of
>>>> 20000). Resetting connection and trying again with a new connection.
>>>> 627967 [timer] INFO  o.a.s.c.healthcheck - ()
>>>>
>>>> 631511 [ProcessThread(sid:0 cport:-1):] INFO
>>>> o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for
>>>> sessionid: 0x154846bbee00008
>>>> 631511 [main-SendThread(localhost:2001)] INFO  o.a.s.s.o.a.z.ClientCnxn
>>>> - Opening socket connection to server localhost/127.0.0.1:2001. Will
>>>> not attempt to authenticate using SASL (unknown error)
>>>> 590891 [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2001] WARN
>>>> o.a.s.s.o.a.z.s.NIOServerCnxn - caught end of stream exception
>>>> org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException:
>>>> Unable to read additional data from client sessionid 0x154846bbee00006,
>>>> likely client has closed socket
>>>>
>>>> 635140 [Curator-Framework-0] WARN  o.a.s.s.o.a.c.ConnectionState -
>>>> Connection attempt unsuccessful after 32485 (greater than max timeout of
>>>> 20000). Resetting connection and trying again with a new connection.
>>>>
>>>> 625190 [Curator-Framework-0] ERROR o.a.s.s.o.a.c.ConnectionState -
>>>> Connection timed out for connection string (localhost:2001/storm) and
>>>> timeout (15000) / elapsed (18942)
>>>> org.apache.storm.shade.org.apache.curator.CuratorConnectionLossException:
>>>> KeeperErrorCode = ConnectionLoss
>>>>
>>>> 643572 [Curator-ConnectionStateManager-0] INFO  o.a.s.zookeeper -
>>>> 192.168.0.101 lost leadership.
>>>>
>>>> 678900 [Thread-79-GenBolt-executor[29 29]] ERROR o.a.s.util - Async
>>>> loop died!
>>>> java.lang.OutOfMemoryError: *GC overhead limit exceeded*
>>>>     at java.lang.Long.valueOf(Long.java:840) ~[?:1.8.0_73]
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>


-- 
Regards,
Navin

Mime
View raw message