samza-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davide Simoncelli <netcelli....@gmail.com>
Subject Re: Kafka client.id collision
Date Fri, 21 Jul 2017 06:01:31 GMT
Hi,

Sorry. Here are the logs:

First attempt: https://pastebin.com/g6z4hEhZ <https://pastebin.com/g6z4hEhZ>

Second attempt: https://pastebin.com/3XwaRstP <https://pastebin.com/3XwaRstP>

Regards

Davide

> On 20 Jul 2017, at 7:02 pm, Prateek Maheshwari <prateekm@utexas.edu> wrote:
> 
> Can you share the entire log file if that's okay? The warning should be a
> red-herring IMHO.
> 
> On Thu, Jul 20, 2017 at 7:50 AM Davide Simoncelli <netcelli.tux@gmail.com <mailto:netcelli.tux@gmail.com>>
> wrote:
> 
>> Hi,
>> 
>> Thanks for the reply.
>> 
>> It is a warning, but the application fails. Here is the logging:
>> 
>> 
>> 017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka version : 0.10.1.1
>> 2017-07-20 10:43:06.349 [main] AppInfoParser [INFO] Kafka commitId :
>> f10ef2720b03b247
>> 2017-07-20 10:43:06.351 [main] AppInfoParser [WARN] Error registering
>> AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
>>        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>        at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>        at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>        at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
>>        at
>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:331)
>>        at
>> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:163)
>>        at
>> org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
>>        at
>> org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.apply(KafkaSystemFactory.scala:89)
>>        at
>> org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:144)
>>        at
>> org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:113)
>>        at
>> org.apache.samza.coordinator.stream.CoordinatorStreamWriter.sendSetConfigMessage(CoordinatorStreamWriter.java:98)
>>        at
>> org.apache.samza.coordinator.stream.CoordinatorStreamWriter.sendMessage(CoordinatorStreamWriter.java:82)
>>        at
>> org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(SamzaYarnAppMasterService.scala:68)
>>        at
>> org.apache.samza.job.yarn.YarnClusterResourceManager.start(YarnClusterResourceManager.java:180)
>>        at
>> org.apache.samza.clustermanager.ContainerProcessManager.start(ContainerProcessManager.java:167)
>>        at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(ClusterBasedJobCoordinator.java:154)
>>        at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.main(ClusterBasedJobCoordinator.java:222)
>> 2017-07-20 10:43:06.549 [main] CoordinatorStreamWriter [INFO] Stopping the
>> coordinator stream producer.
>> 2017-07-20 10:43:06.549 [main] CoordinatorStreamSystemProducer [INFO]
>> Stopping coordinator stream producer.
>> 2017-07-20 10:43:06.549 [main] KafkaProducer [INFO] Closing the Kafka
>> producer with timeoutMillis = 9223372036854775807 ms.
>> 
>> 
>>> On 20 Jul 2017, at 3:16 pm, Jagadish Venkatraman <jagadish1989@gmail.com>
>> wrote:
>>> 
>>> Hi Davide,
>>> 
>>> Is this logged as an error or as a warning?
>>> 
>>> IIUC, this warning should not fail the job. It may not cause some Mbean
>>> sensors / metrics emitted from Kafka to be correctly reported (since,
>> those
>>> are reported per-clientId).
>>> 
>>> The job should still continue to run.
>>> 
>>> The entire log file will be helpful for further debugging!
>>> 
>>> On Thu, Jul 20, 2017 at 3:32 AM, Davide Simoncelli <
>> netcelli.tux@gmail.com <mailto:netcelli.tux@gmail.com> <mailto:netcelli.tux@gmail.com
<mailto:netcelli.tux@gmail.com>>>
>>> wrote:
>>> 
>>>> Hello,
>>>> 
>>>> We are running Kafka 0.10.1.1 in production. Unfortunately the Samza app
>>>> fails to start because of this bug: https://issues.apache.org/ <https://issues.apache.org/>
<
>> https://issues.apache.org/ <https://issues.apache.org/>>
>>>> jira/browse/SAMZA-1027 <
>> https://issues.apache.org/jira/browse/SAMZA-1027 <
>> https://issues.apache.org/jira/browse/SAMZA-1027>>.
>>>> 
>>>> Even hello-samza on YARN fails to start. Here its the error:
>>>> 
>>>> javax.management.InstanceAlreadyExistsException:
>>>> kafka.producer:type=app-info,id=samza_producer-wikipedia_feed-1
>>>>       at com.sun.jmx.mbeanserver.Repository.addMBean(
>>>> Repository.java:437)
>>>>       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
>>>> registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>>>       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
>>>> registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>>>       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
>>>> registerObject(DefaultMBeanServerInterceptor.java:900)
>>>>       at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.
>>>> registerMBean(DefaultMBeanServerInterceptor.java:324)
>>>>       at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(
>>>> JmxMBeanServer.java:522)
>>>>       at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(
>>>> AppInfoParser.java:58)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.<init>(
>>>> KafkaProducer.java:331)
>>>>       at org.apache.kafka.clients.producer.KafkaProducer.<init>(
>>>> KafkaProducer.java:163)
>>>>       at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
>>>> apply(KafkaSystemFactory.scala:89)
>>>>       at org.apache.samza.system.kafka.KafkaSystemFactory$$anonfun$3.
>>>> apply(KafkaSystemFactory.scala:89)
>>>>       at org.apache.samza.system.kafka.KafkaSystemProducer.send(
>>>> KafkaSystemProducer.scala:144)
>>>>       at org.apache.samza.coordinator.stream.
>>>> CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProduce
>>>> r.java:113)
>>>>       at org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
>>>> sendSetConfigMessage(CoordinatorStreamWriter.java:98)
>>>>       at org.apache.samza.coordinator.stream.CoordinatorStreamWriter.
>>>> sendMessage(CoordinatorStreamWriter.java:82)
>>>>       at org.apache.samza.job.yarn.SamzaYarnAppMasterService.onInit(
>>>> SamzaYarnAppMasterService.scala:68)
>>>>       at org.apache.samza.job.yarn.YarnClusterResourceManager.start(
>>>> YarnClusterResourceManager.java:180)
>>>>       at org.apache.samza.clustermanager.ContainerProcessManager.start(
>>>> ContainerProcessManager.java:167)
>>>>       at
>> org.apache.samza.clustermanager.ClusterBasedJobCoordinator.run(
>>>> ClusterBasedJobCoordinator.java:154)
>>>>       at org.apache.samza.clustermanager.ClusterBasedJobCoordinator.
>>>> main(ClusterBasedJobCoordinator.java:222)
>>>> 
>>>> 
>>>> According to samza-job-coordinator.log file, it is creating two
>> producers
>>>> with the same client ID:
>>>> 
>>>> 2017-07-20 04:03:12.208 [main] KafkaSystemProducer [INFO] Creating a new
>>>> producer for system kafka.
>>>> 2017-07-20 04:03:12.224 [main] ProducerConfig [INFO] ProducerConfig
>> values:
>>>>       acks = 1
>>>>       batch.size = 16384
>>>>       block.on.buffer.full = false
>>>>       buffer.memory = 33554432
>>>>       client.id = samza_producer-wikipedia_feed-1
>>>> 
>>>> 
>>>> 2017-07-20 04:03:13.510 [main] KafkaSystemProducer [INFO] Creating a new
>>>> producer for system kafka.
>>>> 2017-07-20 04:03:13.510 [main] ProducerConfig [INFO] ProducerConfig
>> values:
>>>>       acks = 1
>>>>       batch.size = 16384
>>>>       block.on.buffer.full = false
>>>>       buffer.memory = 33554432
>>>>       client.id = samza_producer-wikipedia_feed-1
>>>>       compression.type = none
>>>> 
>>>> Any idea why it is happening?
>>>> 
>>>> Thanks
>>>> 
>>>> Davide
>>> 
>>> 
>>> 
>>> 
>>> --
>>> Jagadish V,
>>> Graduate Student,
>>> Department of Computer Science,
>>> Stanford University
>> 
>> --
> Sent from my iphone.


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message