kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Sat, 16 Sep 2017 10:34:27 GMT
Hi All,

Sometimes I  used to see below exception before Consumer getting
disconnected. I am not sure whether it has any relevance with this issue I
am facing.

DEBUG | 05:16:24 | clients.NetworkClient (NetworkClient.java:702) - Node 0
disconnected.
DEBUG | 05:16:24 | clients.NetworkClient (NetworkClient.java:702) - Node 1
disconnected.

DEBUG | 05:16:24 | network.Selector (Selector.java:399) - Connection with
mykafkainstancekafka5101.mydomain.com/34.220.36.96 disconnected
java.io.EOFException
        at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:87)
~[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
~[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
~[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
~[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
[kafka-clients-0.11.0.0.jar:?]
        at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:294)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1410)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:67)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
[kafka-clients-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[kafka-streams-0.11.0.0.jar:?]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
[kafka-streams-0.11.0.0.jar:?]
DEBUG | 05:16:24 | network.Selector (Selector.java:399) - Connection with
l-mykafkainstancekafka5101/172.31.7.209 disconnected
java.io.EOFException




On Sat, Sep 16, 2017 at 1:04 PM, dev loper <sparkemr@gmail.com> wrote:

> Thank you Ted for your inputs. I have also observed the similarity in
> logs. But I am not sure how to resolve the issue.
>
> @Bill, I repeated my tests by reducing the max MAX_POLL_RECORDS to 500. I
> could see the consumer getting started with 500. But the same problem exits
> even after the configuration change as well. The consumer for one of the
> instance will go down and the whole process repeats which I mentioned in my
> previous mail.
>
> On Sat, Sep 16, 2017 at 9:08 AM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> Though there was no LockException in your log, there is still enough
>> similarity between your log and the one posted by johnchou:
>>
>> https://issues.apache.org/jira/browse/KAFKA-5397?focusedComm
>> entId=16162689&page=com.atlassian.jira.plugin.system.
>> issuetabpanels:comment-tabpanel#comment-16162689
>>
>> Even the line numbers match: sendOffsetCommitRequest
>> (ConsumerCoordinator.java:725)
>>
>> Reducing MAX_POLL_RECORDS_CONFIG may lessen the symptom but that would
>> not meet your throughput goal.
>>
>> On Fri, Sep 15, 2017 at 8:09 PM, dev loper <sparkemr@gmail.com> wrote:
>>
>>> Hi All ,
>>>
>>> @Bill,
>>>
>>> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the
>>> results shortly.
>>>
>>> @Ted,
>>>
>>> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not a
>>> typo . Do you think 50000 is way too high for an kafkaStreams Application
>>> ?  My Spark application which I was trying to replace with kafka streams
>>> was processing 250000 messages per 5 second batch, That was the reason I
>>> set 50000 records for MAX_POLL_RECORDS_CONFIG .
>>>
>>> I don't think I have hit  KAFKA-5397since I couldn't find any instance "
>>> org.apache.kafka.streams.errors.LockException"  in my logs. I could see
>>> below exception , but these exceptions  are triggered long after the
>>> application stopped consuming any messages .
>>>
>>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>>> stream-thread failed to suspend stream tasks
>>> User provided listener org.apache.kafka.streams.proce
>>> ssor.internals.StreamThread$RebalanceListener for group
>>> myKafka-kafkareplica101Sept08 failed on partition revocation
>>>
>>> @Damian , I figured out the  pattern below, I don't know whether it
>>> helps.  The streamthread logs which I shared, Did it help?. I
>>>
>>>
>>> I couldn't figure of the reason why the consumers are getting closed
>>> while its getting allocated and suddenly stopped without any reason. If you
>>> look at the pattern
>>>
>>>  1) Consumer is getting Created with Config Values Supplied by the
>>> Application.
>>>  2) Adding Sensors
>>>  3) Fetching API Version and Initiating Connections to Kafka Brokers
>>> (NetworkClient.java)
>>>  4)  Sending metadata request and Recorded API Version From Broker
>>> (NetworkClient.java)
>>> 5)  Updated cluster metadata version (Some Incremental verison ) to
>>> Cluster
>>> 6)  Discovered coordinator (One of the kafka Brokers)
>>> 7) Initiating connection to coordinator  (One of the kafka Brokers)
>>> 8) fetching committed offsets for partitions (
>>> ConsumerCoordinator.java:826)
>>> 9)  Recorded API versions for node
>>> 10)  Resetting offset for partition( for all the partitions)
>>> 11) Handling ListOffsetResponse ( for all the partitions)
>>> 12) Removing Sensors ( I couldn't see any exceptions though)
>>> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka
>>> consumer has closed. (
>>> I couldn't see any exceptions though, I couldn't figure out the reason
>>> why KafkaConsumer Closed , no reasons provided in the logs)
>>> 14) The kafka streams application goes back o step 1 and creates the
>>> consumer again.( the whole process is repeated but the application 90% of
>>> the time never recovers)
>>> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG
>>> =60000 (previouslyI nteger.MAXVALUE), I could See below exception after 60
>>> seconds of consumer retry.
>>>  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>>> be completed since the group has already rebalancedAfter
>>>
>>> I am not sure what could be wrong on my side and how I can resolve this
>>> issue so that my application start processing messages consistently.
>>>
>>> Thanks
>>> Dev
>>>
>>> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <bill@confluent.io> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 and
>>>> try
>>>> again?
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <sparkemr@gmail.com> wrote:
>>>>
>>>> > Hi Damian,
>>>> >
>>>> > I have repeated my tests with slight configuration change. The current
>>>> > logs captured for "StreamThread"  keyword has more relevant logs when
>>>> > compared to logs which i shared previously. I started the application
>>>> on
>>>> > instances 100,101 and 102 simultaneously with below configuration
>>>> >
>>>> > 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
>>>> >  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously
>>>> nteger.MAXVALUE)
>>>> >
>>>> > When the application started all three instances started processing
>>>> for
>>>> > first few minutes everything went well. After that I could see that
>>>> > "StreamThread100" error consumer was going for a toss and it started
>>>> > closing and creating the consumers for a while exactly with the
>>>> pattern of
>>>> > logs I mentioned in my previous email and after some time I could see
>>>> that
>>>> > " StreamThread100" stopped processing messages with below exception
>>>> and the
>>>> > other two continued processing messages without any issues.
>>>> >
>>>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>> cannot be
>>>> > completed since the group has already rebalanced and assigned the
>>>> > partitions to another member. This means that the time between
>>>> subsequent
>>>> > calls to poll() was longer than the configured max.poll.interval.ms,
>>>> > which typically implies that the poll loop is spending too much time
>>>> > message processing. You can address this either by increasing the
>>>> session
>>>> > timeout or by reducing the maximum size of batches returned in poll()
>>>> with
>>>> > max.poll.records.
>>>> >
>>>> > I think since the consumers were starting and stopping there was no
>>>> poll
>>>> > made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
>>>> > =60000  and the processors were getting closed and started which
>>>> might have
>>>> > resulted in the   "CommitFailedException due to non avialability of
>>>> > processing processors.
>>>> >
>>>> > After some time the issue got propagated to other servers, I have
>>>> attached
>>>> > the relevant logs with this mail Kindly go through this and let me
>>>> know how
>>>> > I can  solve this issue ?
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t
>>>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw>
>>>> >
>>>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <sparkemr@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi Ted,
>>>> >>
>>>> >> What should I be looking in broker logs ? I haven't looked at the
>>>> broker
>>>> >> side since my spark application processing from the same topic with
a
>>>> >> different group id is able to process well.
>>>> >>
>>>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhihong@gmail.com>
wrote:
>>>> >>
>>>> >>> Is there some clue in broker logs ?
>>>> >>>
>>>> >>> Thanks
>>>> >>>
>>>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <sparkemr@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>>> Dear Kafka Users,
>>>> >>>>
>>>> >>>> I am fairly new to Kafka Streams . I have deployed two instances
of
>>>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created
a
>>>> topic with
>>>> >>>> 36 partitions .and speperate application writes to this
topic and
>>>> it
>>>> >>>> produces records at the rate of 10000 messages per second.
I have
>>>> threes
>>>> >>>> instances of AWS  M4.xlarge instance  where my Kafka streams
>>>> application is
>>>> >>>> running which consumes these messages produced by the other
>>>> application.
>>>> >>>> The application  starts up fine working fine and its processing
>>>> messages on
>>>> >>>> the first instance,  but when I start the same application
on other
>>>> >>>> instances it is not starting even though the process is
alive it
>>>> is not
>>>> >>>> processing messages.Also I could see the other instances
takes a
>>>> long time
>>>> >>>> to start .
>>>> >>>>
>>>> >>>> Apart from first instance,  other instances I could see
the
>>>> consumer
>>>> >>>> getting added and removed repeatedly and I couldn't see
any message
>>>> >>>> processing at all . I have attached the detailed logs where
this
>>>> behavior
>>>> >>>> is observed.
>>>> >>>>
>>>> >>>> Consumer is getting started with below log in these instances
and
>>>> >>>> getting stopped with below log (* detailed logs attached
*)
>>>> >>>>
>>>> >>>> INFO  | 21:59:30 | consumer.ConsumerConfig
>>>> (AbstractConfig.java:223) -
>>>> >>>> ConsumerConfig values:
>>>> >>>>     auto.commit.interval.ms = 5000
>>>> >>>>     auto.offset.reset = latest
>>>> >>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>>>> >>>> l-mykafkainstancekafka5102:9092]
>>>> >>>>     check.crcs = true
>>>> >>>>     client.id =
>>>> >>>>     connections.max.idle.ms = 540000
>>>> >>>>     enable.auto.commit = false
>>>> >>>>     exclude.internal.topics = true
>>>> >>>>     fetch.max.bytes = 52428800
>>>> >>>>     fetch.max.wait.ms = 500
>>>> >>>>     fetch.min.bytes = 1
>>>> >>>>     group.id = myKafka-kafkareplica101Sept08
>>>> >>>>     heartbeat.interval.ms = 3000
>>>> >>>>     interceptor.classes = null
>>>> >>>>     internal.leave.group.on.close = true
>>>> >>>>     isolation.level = read_uncommitted
>>>> >>>>     key.deserializer = class mx.july.jmx.proximity.kafka.Ka
>>>> fkaKryoCodec
>>>> >>>>     max.partition.fetch.bytes = 1048576
>>>> >>>>     max.poll.interval.ms = 300000
>>>> >>>>     max.poll.records = 500
>>>> >>>>     metadata.max.age.ms = 300000
>>>> >>>>     metric.reporters = []
>>>> >>>>     metrics.num.samples = 2
>>>> >>>>     metrics.recording.level = INFO
>>>> >>>>     metrics.sample.window.ms = 30000
>>>> >>>>     partition.assignment.strategy = [class
>>>> >>>> org.apache.kafka.clients.consumer.RangeAssignor]
>>>> >>>>     receive.buffer.bytes = 65536
>>>> >>>>     reconnect.backoff.max.ms = 1000
>>>> >>>>     reconnect.backoff.ms = 50
>>>> >>>>     request.timeout.ms = 305000
>>>> >>>>     retry.backoff.ms = 100
>>>> >>>>     sasl.jaas.config = null
>>>> >>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>> >>>>     sasl.kerberos.min.time.before.relogin = 60000
>>>> >>>>     sasl.kerberos.service.name = null
>>>> >>>>     sasl.kerberos.ticket.renew.jitter = 0.05
>>>> >>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
>>>> >>>>     sasl.mechanism = GSSAPI
>>>> >>>>     security.protocol = PLAINTEXT
>>>> >>>>     send.buffer.bytes = 131072
>>>> >>>>     session.timeout.ms = 10000
>>>> >>>>     ssl.cipher.suites = null
>>>> >>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>> >>>>     ssl.endpoint.identification.algorithm = null
>>>> >>>>     ssl.key.password = null
>>>> >>>>     ssl.keymanager.algorithm = SunX509
>>>> >>>>     ssl.keystore.location = null
>>>> >>>>     ssl.keystore.password = null
>>>> >>>>     ssl.keystore.type = JKS
>>>> >>>>     ssl.protocol = TLS
>>>> >>>>     ssl.provider = null
>>>> >>>>     ssl.secure.random.implementation = null
>>>> >>>>     ssl.trustmanager.algorithm = PKIX
>>>> >>>>     ssl.truststore.location = null
>>>> >>>>     ssl.truststore.password = null
>>>> >>>>     ssl.truststore.type = JKS
>>>> >>>>     value.deserializer = class my.dev.MessageUpdateCodec
>>>> >>>>
>>>> >>>>
>>>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer
>>>> (KafkaConsumer.java:1617) -
>>>> >>>> The Kafka consumer has closed. and the whole process repeats.
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>> Below you can find my startup code for kafkastreams and
the
>>>> parameters
>>>> >>>> which I have configured for starting the kafkastreams application
.
>>>> >>>>
>>>> >>>>         private static Properties settings = new Properties();
>>>> >>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>> >>>> "mykafkastreamsapplication");
>>>> >>>>         settings.put(ConsumerConfig.A
>>>> UTO_OFFSET_RESET_CONFIG,"latest");
>>>> >>>>         settings.put(ConsumerConfig.H
>>>> EARTBEAT_INTERVAL_MS_CONFIG,"10
>>>> >>>> 000");
>>>> >>>>         settings.put(ConsumerConfig.S
>>>> ESSION_TIMEOUT_MS_CONFIG,"30000");
>>>> >>>>         settings.put(ConsumerConfig.M
>>>> AX_POLL_INTERVAL_MS_CONFIG,Inte
>>>> >>>> ger.MAX_VALUE);
>>>> >>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
>>>> "10000");
>>>> >>>>         settings.put(ConsumerConfig.C
>>>> ONNECTIONS_MAX_IDLE_MS_CONFIG,"
>>>> >>>> 60000");
>>>> >>>>
>>>> >>>>         KStreamBuilder builder = new KStreamBuilder();
>>>> >>>>         KafkaStreams streams = new KafkaStreams(builder,
settings);
>>>> >>>>         builder.addSource(.....
>>>> >>>>          .addProcessor  .............
>>>> >>>>          .addProcessor  ........
>>>> >>>>
>>>> >>>>          .addStateStore(...............
>>>> ....).persistent().build(),"my
>>>> >>>> processor")
>>>> >>>>          .addSink ..............
>>>> >>>>          . addSink ..............
>>>> >>>>           streams.start();
>>>> >>>>
>>>> >>>> and I am using a Simple  processor to process my logic ..
>>>> >>>>
>>>> >>>> public class InfoProcessor extends AbstractProcessor<Key,
Update> {
>>>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor
>>>> .class);
>>>> >>>> private ProcessorContext context;
>>>> >>>> private KeyValueStore<Key, Info> infoStore;
>>>> >>>>
>>>> >>>> @Override
>>>> >>>> @SuppressWarnings("unchecked")
>>>> >>>> public void init(ProcessorContext context) {
>>>> >>>>     this.context = context;
>>>> >>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS
*
>>>> 1000);
>>>> >>>>     infoStore = (KeyValueStore<Key, Info>)
>>>> >>>> context.getStateStore("InfoStore");
>>>> >>>> }
>>>> >>>>
>>>> >>>> @Override
>>>> >>>> public void process(Key key, Update update) {
>>>> >>>>     try {
>>>> >>>>         if (key != null && update != null) {
>>>> >>>>             Info info = infoStore.get(key);
>>>> >>>>             // merge logic
>>>> >>>>             infoStore.put(key, info);
>>>> >>>>         }
>>>> >>>>
>>>> >>>>     } catch (Exception e) {
>>>> >>>>         logger.error(e.getMessage(), e);
>>>> >>>>     } finally {
>>>> >>>>     }
>>>> >>>>     context.commit();
>>>> >>>> }
>>>> >>>>
>>>> >>>> @Override
>>>> >>>> public void punctuate(long timestamp) {
>>>> >>>>     try {
>>>> >>>>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>>>> >>>>         while (iter.hasNext()) {
>>>> >>>>             // processing logic
>>>> >>>>
>>>> >>>>         }
>>>> >>>>         iter.close();
>>>> >>>>         context.commit();
>>>> >>>>     } catch (Exception e) {
>>>> >>>>         logger.error(e.getMessage(), e);
>>>> >>>>     }
>>>> >>>> }
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message