kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Producer fails to send data when it is used in log4j appender.
Date Wed, 05 Mar 2014 19:35:46 GMT
Just see which log4j property file is included in the java classpath. If
it's not there, you could add one to your classpath.

Thanks,

Jun


On Wed, Mar 5, 2014 at 3:13 AM, 김동경 <style9595@gmail.com> wrote:

> Sorry.
> Since I used maven for dependency, Kafka is included as JAR.
> In this case, are there any way to turn off it?
>
> Thanks
> Regards
> Dongkyoung.
>
>
> 2014-03-05 13:21 GMT+09:00 Jun Rao <junrao@gmail.com>:
>
> > Just change config/log4j/properties inside Kafka.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 4, 2014 at 4:09 PM, 김동경 <style9595@gmail.com> wrote:
> >
> > > I couldn`t find any configuration relevant to turning off the log in
> > > http://kafka.apache.org/documentation.html#configuration.
> > > I included Kafka as Maven dependency.
> > > How could I turn off the Kafka log in the code?
> > >
> > > Thanks
> > > Regards
> > > Dongkyoung
> > >
> > > 2014-03-04 14:40 GMT+09:00 Jun Rao <junrao@gmail.com>:
> > >
> > > > I think it tries to add the logging in Kafka itself back to the
> > > > KafkaAppender.
> > > > This creates an infinite loop. Maybe you could try setting the log
> > level
> > > in
> > > > Kafka package to OFF?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Mon, Mar 3, 2014 at 6:26 PM, 김동경 <style9595@gmail.com>
wrote:
> > > >
> > > > > I made simple log4j kafka appender.
> > > > > I copied most of the code from 0.8.0 Producer example in "
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
> > > > "
> > > > > to code "append" function.
> > > > >
> > > > > I confirmed producer example code is working with my environment.
> > > > > But when I use same logic for log4j appender, it didn`t work.
> > > > > It is trying to fetch metadata repeatedly and I am getting infinite
> > > > > "Utils$.swallowError" error.
> > > > >
> > > > > I have no idea on swallowError.
> > > > > It looks it failed to fetch metadata from broker, it is trying
> again
> > > and
> > > > > again.
> > > > > Max retries count is just 3, but I don`t know why it happens.
> > > > >
> > > > > Are there anything that should be done to produce log data into
> Kafka
> > > via
> > > > > log4j Appender?
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> ---------------------------------------------------------------------------------------------------------------------------------
> > > > > INFO [main] (Logging.scala:67) - Verifying properties
> > > > >  INFO [main] (Logging.scala:67) - Property metadata.broker.list is
> > > > > overridden to kafka01:9092
> > > > >  WARN [main] (Logging.scala:82) - Property zk.connect is not valid
> > > > >  INFO [main] (Logging.scala:67) - Property request.required.acks
is
> > > > > overridden to 1
> > > > >  INFO [main] (Logging.scala:67) - Property partitioner.class is
> > > > overridden
> > > > > to com.samsung.rtdp.SimplePartitioner2
> > > > >  INFO [main] (Logging.scala:67) - Property serializer.class is
> > > overridden
> > > > > to kafka.serializer.StringEncoder
> > > > >  INFO [main] (HelloWorld.java:14) - Entering application.
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > >  INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
> > > > > id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
> > > > > Set(KafkaAppenderTest)
> > > > > .
> > > > > .
> > > > > .
> > > > > java.lang.StackOverflowError
> > > > > at java.lang.StringCoding.deref(StringCoding.java:64)
> > > > > at java.lang.StringCoding.encode(StringCoding.java:275)
> > > > > at java.lang.String.getBytes(String.java:954)
> > > > > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > > > > at
> > java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
> > > > > at java.io.File.exists(File.java:791)
> > > > > at
> > sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
> > > > > at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > > at java.lang.ClassLoader.defineClass1(Native Method)
> > > > > at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
> > > > > at
> > > >
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> > > > > at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
> > > > > at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
> > > > > at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
> > > > > at java.security.AccessController.doPrivileged(Native Method)
> > > > > at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
> > > > > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
> > > > > at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
> > > > > at
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
> > > > > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> > > > > at
> > > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > > at org.apache.log4j.Category.error(Category.java:322)
> > > > > at
> > kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > > at
> > kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:189)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > > at
> > > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> > > > > at org.apache.log4j.Category.callAppenders(Category.java:206)
> > > > > at org.apache.log4j.Category.forcedLog(Category.java:391)
> > > > > at org.apache.log4j.Category.info(Category.java:666)
> > > > > at kafka.utils.Logging$class.info(Logging.scala:67)
> > > > > at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
> > > > > at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
> > > > > at
> > > > >
> > > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> > > > > at kafka.utils.Utils$.swallow(Utils.scala:187)
> > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > > > at kafka.utils.Utils$.swallowError(Utils.scala:46)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> > > > > at kafka.producer.Producer.send(Producer.scala:76)
> > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> > > > > at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
> > > > > .
> > > > > .
> > > > > .
> > > > >
> > > > >
> > > >
> > >
> >
> ----------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > > >
> > > > > Here is my code.
> > > > > Since it came from producer example code, it is quite
> > straightforward.
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------
> > > > > package com.samsung.rtdp;
> > > > >
> > > > > import java.io.IOException;
> > > > > import java.util.Date;
> > > > > import java.util.Properties;
> > > > > import java.util.Random;
> > > > >
> > > > > import org.apache.log4j.AppenderSkeleton;
> > > > > import org.apache.log4j.spi.ErrorCode;
> > > > > import org.apache.log4j.spi.LoggingEvent;
> > > > >
> > > > > import kafka.javaapi.producer.Producer;
> > > > > import kafka.producer.KeyedMessage;
> > > > > import kafka.producer.Partitioner;
> > > > > import kafka.producer.ProducerConfig;
> > > > > import kafka.utils.VerifiableProperties;
> > > > >
> > > > >
> > > > > public class KafkaAppender extends AppenderSkeleton {
> > > > >
> > > > > private String brokerList;
> > > > > private String serializer;
> > > > > private String partitioner;
> > > > > private String topic;
> > > > > private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";
> > > > >
> > > > > private Properties props;
> > > > > private ProducerConfig config;
> > > > > private Producer<String, String> producer;
> > > > >
> > > > > public void setBrokerList(String brokerList) { this.brokerList =
> > > > > brokerList; }
> > > > > public String getBrokerList() { return this.brokerList; }
> > > > >
> > > > > public void setSerializerClass(String serializer) {
> this.serializer =
> > > > > serializer; }
> > > > > public String getSerializer() { return this.serializer; }
> > > > >
> > > > > public void setPartitionerClass(String partitioner) {
> > this.partitioner
> > > =
> > > > > partitioner; }
> > > > > public String getPartitioner() { return this.partitioner; }
> > > > >
> > > > > public void setTopic(String topic) { this.topic = topic; }
> > > > > public String getTopic() { return this.topic; }
> > > > >
> > > > >
> > > > > public void printParameters(){
> > > > > System.out.println("BrokerList : " + brokerList);
> > > > > System.out.println("Serializer Class : " + serializer);
> > > > > System.out.println("Partitioner Class : " + partitioner);
> > > > > System.out.println("Topic : " + topic);
> > > > > }
> > > > >
> > > > > public void activateOptions() {
> > > > >
> > > > > // printParameters();
> > > > >
> > > > > props = new Properties();
> > > > >
> > > > > props.put("metadata.broker.list", brokerList);
> > > > > props.put("serializer.class", serializer);
> > > > > props.put("partitioner.class", partitioner);
> > > > > props.put("request.required.acks",
> DEFAULT_REQUIRED_REQUEST_NUACKS);
> > > > > props.put("zk.connect", "kafka01:2181");
> > > > >
> > > > > config = new ProducerConfig(props);
> > > > > producer = new Producer<String, String>(config);
> > > > > }
> > > > >
> > > > >
> > > > > public void close() {
> > > > > // TODO Auto-generated method stub
> > > > > producer.close();
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > > public boolean requiresLayout() {
> > > > > // TODO Auto-generated method stub
> > > > > return true;
> > > > > }
> > > > >
> > > > > @Override
> > > > > protected void append(LoggingEvent event) {
> > > > > // TODO Auto-generated method stub
> > > > >
> > > > >
> > > > > // printParameters();
> > > > >
> > > > > if( this.layout == null )
> > > > > {
> > > > > errorHandler.error("No layout for appender " + name , null,
> > > > > ErrorCode.MISSING_LAYOUT );
> > > > > return;
> > > > > }
> > > > >
> > > > > String msg = this.layout.format(event);
> > > > >
> > > > >  KeyedMessage<String, String> data = new KeyedMessage<String,
> > > > > String>("KafkaAppenderTest", msg, msg);
> > > > >
> > > > > producer.send(data);
> > > > > }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > > >
> > > > >
> > > > > And this is my log4j.properties
> > > > >
> > > > >
> > > >
> > >
> >
> -------------------------------------------------------------------------------------------------------------------------------------
> > > > > log4j.rootLogger=INFO, stdout, KAFKA
> > > > > # set the logger for your package to be the KAFKA appender
> > > > > #log4j.logger.com.samsung.rtdp=INFO, KAFKA
> > > > >
> > > > > log4j.appender.stdout=org.apache.log4j.ConsoleAppender
> > > > > log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
> > > > >
> > > > > # Pattern to output the caller's file name and line number.
> > > > > log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) -
> > %m%n
> > > > >
> > > > > log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
> > > > > log4j.appender.KAFKA.BrokerList=kafka01:9092
> > > > > log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
> > > > >
> > >
> log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
> > > > > log4j.appender.KAFKA.Topic=test
> > > > > log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
> > > > > log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) -
> %m%n
> > > > >
> > > > >
> > > >
> > >
> >
> --------------------------------------------------------------------------------------------------------------------------------------
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message