kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 김동경 <style9...@gmail.com>
Subject Producer fails to send data when it is used in log4j appender.
Date Tue, 04 Mar 2014 02:26:35 GMT
I made simple log4j kafka appender.
I copied most of the code from 0.8.0 Producer example in "
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example"
to code "append" function.

I confirmed producer example code is working with my environment.
But when I use same logic for log4j appender, it didn`t work.
It is trying to fetch metadata repeatedly and I am getting infinite
"Utils$.swallowError" error.

I have no idea on swallowError.
It looks it failed to fetch metadata from broker, it is trying again and
again.
Max retries count is just 3, but I don`t know why it happens.

Are there anything that should be done to produce log data into Kafka via
log4j Appender?

---------------------------------------------------------------------------------------------------------------------------------
INFO [main] (Logging.scala:67) - Verifying properties
 INFO [main] (Logging.scala:67) - Property metadata.broker.list is
overridden to kafka01:9092
 WARN [main] (Logging.scala:82) - Property zk.connect is not valid
 INFO [main] (Logging.scala:67) - Property request.required.acks is
overridden to 1
 INFO [main] (Logging.scala:67) - Property partitioner.class is overridden
to com.samsung.rtdp.SimplePartitioner2
 INFO [main] (Logging.scala:67) - Property serializer.class is overridden
to kafka.serializer.StringEncoder
 INFO [main] (HelloWorld.java:14) - Entering application.
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 0 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 1 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 2 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 3 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 4 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 5 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 6 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 7 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 8 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 9 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 10 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 11 for 1 topic(s)
Set(KafkaAppenderTest)
 INFO [main] (HelloWorld.java:14) - Fetching metadata from broker
id:0,host:kafka01,port:9092 with correlation id 12 for 1 topic(s)
Set(KafkaAppenderTest)
.
.
.
java.lang.StackOverflowError
at java.lang.StringCoding.deref(StringCoding.java:64)
at java.lang.StringCoding.encode(StringCoding.java:275)
at java.lang.String.getBytes(String.java:954)
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:243)
at java.io.File.exists(File.java:791)
at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1014)
at sun.misc.URLClassPath.getResource(URLClassPath.java:189)
at java.net.URLClassLoader$1.run(URLClassLoader.java:209)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at
org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
at
org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.error(Category.java:322)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Utils$.swallow(Utils.scala:189)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.info(Category.java:666)
at kafka.utils.Logging$class.info(Logging.scala:67)
at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.samsung.rtdp.KafkaAppender.append(KafkaAppender.java:121)
.
.
.
----------------------------------------------------------------------------------------------------------------------------------


Here is my code.
Since it came from producer example code, it is quite straightforward.
-------------------------------------------------------------------------------------------------------------------------
package com.samsung.rtdp;

import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import java.util.Random;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.ErrorCode;
import org.apache.log4j.spi.LoggingEvent;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.Partitioner;
import kafka.producer.ProducerConfig;
import kafka.utils.VerifiableProperties;


public class KafkaAppender extends AppenderSkeleton {

private String brokerList;
private String serializer;
private String partitioner;
private String topic;
private String DEFAULT_REQUIRED_REQUEST_NUACKS="1";

private Properties props;
private ProducerConfig config;
private Producer<String, String> producer;

public void setBrokerList(String brokerList) { this.brokerList =
brokerList; }
public String getBrokerList() { return this.brokerList; }

public void setSerializerClass(String serializer) { this.serializer =
serializer; }
public String getSerializer() { return this.serializer; }

public void setPartitionerClass(String partitioner) { this.partitioner =
partitioner; }
public String getPartitioner() { return this.partitioner; }

public void setTopic(String topic) { this.topic = topic; }
public String getTopic() { return this.topic; }


public void printParameters(){
System.out.println("BrokerList : " + brokerList);
System.out.println("Serializer Class : " + serializer);
System.out.println("Partitioner Class : " + partitioner);
System.out.println("Topic : " + topic);
}

public void activateOptions() {

// printParameters();

props = new Properties();

props.put("metadata.broker.list", brokerList);
props.put("serializer.class", serializer);
props.put("partitioner.class", partitioner);
props.put("request.required.acks", DEFAULT_REQUIRED_REQUEST_NUACKS);
props.put("zk.connect", "kafka01:2181");

config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}


public void close() {
// TODO Auto-generated method stub
producer.close();

}


public boolean requiresLayout() {
// TODO Auto-generated method stub
return true;
}

@Override
protected void append(LoggingEvent event) {
// TODO Auto-generated method stub


// printParameters();

if( this.layout == null )
{
errorHandler.error("No layout for appender " + name , null,
ErrorCode.MISSING_LAYOUT );
return;
}

String msg = this.layout.format(event);

 KeyedMessage<String, String> data = new KeyedMessage<String,
String>("KafkaAppenderTest", msg, msg);

producer.send(data);
}

}

-------------------------------------------------------------------------------------------------------------------------------------



And this is my log4j.properties
-------------------------------------------------------------------------------------------------------------------------------------
log4j.rootLogger=INFO, stdout, KAFKA
# set the logger for your package to be the KAFKA appender
#log4j.logger.com.samsung.rtdp=INFO, KAFKA

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.KAFKA=com.samsung.rtdp.KafkaAppender
log4j.appender.KAFKA.BrokerList=kafka01:9092
log4j.appender.KAFKA.SerializerClass=kafka.serializer.StringEncoder
log4j.appender.KAFKA.PartitionerClass=com.samsung.rtdp.SimplePartitioner2
log4j.appender.KAFKA.Topic=test
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
--------------------------------------------------------------------------------------------------------------------------------------

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message