kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Producer fails to send data when it is used in log4j appender.
Date Wed, 05 Mar 2014 04:21:57 GMT
Just change config/log4j/properties inside Kafka.

Thanks,

Jun


On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <style9595@gmail.com> wrote:

> I couldn`t find any configuration relevant to turning off the log in
> http://kafka.apache.org/documentation.html#configuration.
> I included Kafka as Maven dependency.
> How could I turn off the Kafka log in the code?
>
> Thanks
> Regards
> Dongkyoung
>
> 2014-03-04 14:40 GMT+09:00 Jun Rao <junrao@gmail.com>:
>
> > I think it tries to add the logging in Kafka itself back to the
> > KafkaAppender.
> > This creates an infinite loop. Maybe you could try setting the log level
> in
> > Kafka package to OFF?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <style9595@gmail.com> wrote:
> >
> > > I made simple log4j kafka appender.
> > > I copied most of the code from 0.8.0 Producer example in "
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> > "
> > > to code "append" function.
> > >
> > > I confirmed producer example code is working with my environment.
> > > But when I use same logic for log4j appender, it didn`t work.
> > > It is trying to fetch metadata repeatedly and I am getting infinite
> > > "Utils$.swallowError" error.
> > >
> > > I have no idea on swallowError.
> > > It looks it failed to fetch metadata from broker, it is trying again
> and
> > > again.
> > > Max retries count is just 3, but I don`t know why it happens.
> > >
> > > Are there anything that should be done to produce log data into Kafka
> via
> > > log4j Appender?
> > >
> > >
> > >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > > INFO [main] (Logging.scala:67) - Verifying properties
> > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > overridden to kafka01:9092
> > >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> > >  INFO [main] (Logging.scala:67) - Property request.required.acks is
> > > overridden to 1
> > >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> > overridden
> > > to com.samsung.rtdp.SimplePartitioner2
> > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> overridden
> > > to kafka.serializer.StringEncoder
> > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > > Set(KafkaAppenderTest)
> > > .
> > > .
> > > .
> > > java.lang.StackOverflowError
> > > at java.lang.StringCoding.deref(StringCoding.java:64)
> > > at java.lang.StringCoding.encode(StringCoding.java:275)
> > > at java.lang.String.getBytes(String.java:954)
> > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > > at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > > at java.io.File.exists(File.java:791)
> > > at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > at java.lang.ClassLoader.defineClass1(Native Method)
> > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > > at
> > java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > > at java.security.AccessController.doPrivileged(Native Method)
> > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > at
> > >
> > >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > > at
> > >
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > > at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > > at
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > at
> > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > at org.apache.log4j.Category.error(Category.java:322)
> > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > at kafka.producer.Producer.send(Producer.scala:76)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > at
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > at
> > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > at org.apache.log4j.Category.info(Category.java:666)
> > > at kafka.utils.Logging$class.info(Logging.scala:67)
> > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > at kafka.producer.Producer.send(Producer.scala:76)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > .
> > > .
> > > .
> > >
> > >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> > >
> > >
> > > Here is my code.
> > > Since it came from producer example code, it is quite straightforward.
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > > package com.samsung.rtdp;
> > >
> > > import java.io.IOException;
> > > import java.util.Date;
> > > import java.util.Properties;
> > > import java.util.Random;
> > >
> > > import org.apache.log4j.AppenderSkeleton;
> > > import org.apache.log4j.spi.ErrorCode;
> > > import org.apache.log4j.spi.LoggingEvent;
> > >
> > > import kafka.javaapi.producer.Producer;
> > > import kafka.producer.KeyedMessage;
> > > import kafka.producer.Partitioner;
> > > import kafka.producer.ProducerConfig;
> > > import kafka.utils.VerifiableProperties;
> > >
> > >
> > > public class KafkaAppender extends AppenderSkeleton {
> > >
> > > private String brokerList;
> > > private String serializer;
> > > private String partitioner;
> > > private String topic;
> > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> > >
> > > private Properties props;
> > > private ProducerConfig config;
> > > private Producer<String, String> producer;
> > >
> > > public void setBrokerList(String brokerList) { this.brokerList =
> > > brokerList; }
> > > public String getBrokerList() { return this.brokerList; }
> > >
> > > public void setSerializerClass(String serializer) { this.serializer =
> > > serializer; }
> > > public String getSerializer() { return this.serializer; }
> > >
> > > public void setPartitionerClass(String partitioner) { this.partitioner
> =
> > > partitioner; }
> > > public String getPartitioner() { return this.partitioner; }
> > >
> > > public void setTopic(String topic) { this.topic = topic; }
> > > public String getTopic() { return this.topic; }
> > >
> > >
> > > public void printParameters(){
> > > System.out.println("BrokerList : " + brokerList);
> > > System.out.println("Serializer Class : " + serializer);
> > > System.out.println("Partitioner Class : " + partitioner);
> > > System.out.println("Topic : " + topic);
> > > }
> > >
> > > public void activateOptions() {
> > >
> > > // printParameters();
> > >
> > > props = new Properties();
> > >
> > > props.put("metadata.broker.list", brokerList);
> > > props.put("serializer.class", serializer);
> > > props.put("partitioner.class", partitioner);
> > > props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
> > > props.put("zk.connect", "kafka01:2181");
> > >
> > > config = new ProducerConfig(props);
> > > producer = new Producer<String, String>(config);
> > > }
> > >
> > >
> > > public void close() {
> > > // TODO Auto-generated method stub
> > > producer.close();
> > >
> > > }
> > >
> > >
> > > public boolean requiresLayout() {
> > > // TODO Auto-generated method stub
> > > return true;
> > > }
> > >
> > > @Override
> > > protected void append(LoggingEvent event) {
> > > // TODO Auto-generated method stub
> > >
> > >
> > > // printParameters();
> > >
> > > if( this.layout == null )
> > > {
> > > errorHandler.error("No layout for appender " + name , null,
> > > ErrorCode.MISSING_LAYOUT );
> > > return;
> > > }
> > >
> > > String msg = this.layout.format(event);
> > >
> > >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > > String>("KafkaAppenderTest", msg, msg);
> > >
> > > producer.send(data);
> > > }
> > >
> > > }
> > >
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > >
> > >
> > >
> > > And this is my log4j.properties
> > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > log4j.rootLogger=INFO, stdout, KAFKA
> > > # set the logger for your package to be the KAFKA appender
> > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> > >
> > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > >
> > > # Pattern to output the caller's file name and line number.
> > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > >
> > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > >
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > > log4j.appender.KAFKA.Topic=test
> > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
> > >
> > >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message