kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bae, Jae Hyeon" <metac...@gmail.com>
Subject Re: ZK session kill makes high level consumer replay
Date Thu, 27 Mar 2014 23:21:43 GMT
When I call consumer.commitOffsets(); before killing session, unit test
succeeded. This problem would happen only with autoCommit enabled.

Could you fix this problem before releasing 0.8.1.1?

Thank you
Best, Jae


On Thu, Mar 27, 2014 at 3:57 PM, Bae, Jae Hyeon <metacret@gmail.com> wrote:

> Hi
>
> While testing kafka 0.8 consumer's zk resilience, I found that on the zk
> session kill and handleNewSession() is called, high level consumer is
> replaying messages.
>
> Is this know issue? I am attaching unit test source code.
>
> package com.netflix.nfkafka.zktest;
>
> import com.fasterxml.jackson.core.JsonProcessingException;
> import com.google.common.collect.ImmutableMap;
> import com.google.common.collect.Lists;
> import com.netflix.logging.chukwa.JsonMapper;
> import kafka.consumer.Consumer;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.consumer.ZookeeperConsumerConnector;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
> import kafka.server.KafkaConfig;
> import kafka.server.KafkaServer;
> import kafka.utils.ZkUtils;
> import org.I0Itec.zkclient.IDefaultNameSpace;
> import org.I0Itec.zkclient.ZkClient;
> import org.I0Itec.zkclient.ZkConnection;
> import org.I0Itec.zkclient.ZkServer;
> import org.apache.commons.io.FileUtils;
> import org.apache.commons.lang.StringUtils;
> import org.apache.curator.test.KillSession;
> import org.apache.curator.test.TestingServer;
> import org.apache.zookeeper.ZKUtil;
> import org.apache.zookeeper.ZooKeeper;
> import org.codehaus.jackson.type.TypeReference;
> import org.junit.AfterClass;
> import org.junit.BeforeClass;
> import org.junit.Test;
>
> import java.io.File;
> import java.io.IOException;
> import java.lang.reflect.Field;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.Random;
>
> import static org.junit.Assert.assertEquals;
> import static org.junit.Assert.assertTrue;
> import static org.junit.Assert.fail;
>
> public class TestZkSessionKill {
>     private static TestingServer zkServer;
>     private static KafkaServer server1;
>     private static KafkaServer server2;
>     private static KafkaConfig config1;
>     private static KafkaConfig config2;
>     private static ZooKeeper zk1;
>     private static ZooKeeper zk2;
>
>     private static ConsumerConnector consumer;
>     private static ZooKeeper zkConsumer;
>     private static KafkaStream<byte[], byte[]> stream;
>
>     private static final int    BROKER_ID1 = 0;
>     private static final int    BROKER_ID2 = 1;
>     private static final int    KAFKA_PORT1 = 2200;
>     private static final int    KAFKA_PORT2 = 2201;
>
>     public static String feed = "testzksessionkill";
>
>     @BeforeClass
>     public static void setup() throws Exception {
>         zkServer = new TestingServer(-1, tempDir());
>         config1 = new KafkaConfig(createBrokerConfig(BROKER_ID1,
> KAFKA_PORT1));
>         server1 = createServer(config1);
>         zk1 = getZk(server1.zkClient());
>
>         config2 = new KafkaConfig(createBrokerConfig(BROKER_ID2,
> KAFKA_PORT2));
>         server2 = createServer(config2);
>         zk2 = getZk(server2.zkClient());
>
>         Properties props = new Properties();
>         props.setProperty("zookeeper.connect",
> zkServer.getConnectString());
>         props.setProperty("group.id", feed);
>         props.setProperty("auto.offset.reset", "smallest");
>
>         generateDataToKafka(0); // initially we have to create the topic
>
>         createConsumer(props);
>     }
>
>     public static Properties createBrokerConfig(int nodeId, int port) {
>         Properties props = new Properties();
>         props.put("broker.id",
> Integer.toString(nodeId));
>         props.put("brokerId",                    Integer.toString(nodeId));
>         props.put("host.name",                   "localhost");
>         props.put("port",                        Integer.toString(port));
>         props.put("log.dir",
> tempDir().getAbsolutePath());
>         props.put("log.flush.interval.messages", "1");
>         props.put("zookeeper.connect",
> zkServer.getConnectString());
>         props.put("replica.socket.timeout.ms",   "1500");
>         props.put("hostName",                    "localhost");
>         props.put("numPartitions",               "1");
>
>         return props;
>     }
>
>     public static File tempDir() {
>         File f = new File("./build/test", "kafka-" + new
> Random().nextInt(1000000));
>         f.mkdirs();
>         System.out.println(f);
>         f.deleteOnExit();
>         return f;
>     }
>
>     @AfterClass
>     public static void shutdown() throws IOException {
>         if (server1 != null) {
>             server1.shutdown();
>             server1.awaitShutdown();
>         }
>
>         if (server2 != null) {
>             server2.shutdown();
>             server2.awaitShutdown();
>         }
>
>         zkServer.close();
>     }
>
>     public static KafkaServer createServer(KafkaConfig config) throws
> NoSuchFieldException, IllegalAccessException {
>         KafkaServer server = new KafkaServer(config,
> kafka.utils.SystemTime$.MODULE$);
>         server.startup();
>         return server;
>     }
>
>     public static ZooKeeper getZk(ZkClient zkClient) throws
> NoSuchFieldException, IllegalAccessException {
>         Field f = zkClient.getClass().getDeclaredField("_connection");
>         f.setAccessible(true);
>         ZkConnection zkConnection = (ZkConnection) f.get(zkClient);
>         Field fk = zkConnection.getClass().getDeclaredField("_zk");
>         fk.setAccessible(true);
>         return (ZooKeeper) fk.get(zkConnection);
>     }
>
>
>     public static int messageCount = 25;
>
>     public static void generateDataToKafka(int startNumber) throws
> IOException {
>         Properties props = new Properties();
>         props.setProperty("metadata.broker.list", getBrokerListStr());
>         props.setProperty("producer.discovery", "static");
>         props.setProperty("request.required.acks", "0");
>         props.setProperty("producer.type", "sync");
>
>         Producer<byte[], byte[]> producer = new Producer<byte[],
> byte[]>(new ProducerConfig(props));
>         for (int i = 0; i < messageCount; ++i) {
>             Map<String, Object> map = ImmutableMap.<String,
> Object>builder()
>                     .put("messageId", (i + startNumber))
>                     .put("ts", System.currentTimeMillis())
>                     .build();
>
>             try {
>                 producer.send(new KeyedMessage<byte[], byte[]>(feed,
> JsonMapper.getInstance().writeValueAsBytes(map)));
>                 System.out.println("sent: " + map);
>             } catch (JsonProcessingException e) {
>                 fail();
>             }
>         }
>         producer.close();
>     }
>
>     public static String getBrokerListStr() {
>         List<String> str = Lists.newArrayList();
>         for (KafkaConfig config : Lists.newArrayList(config1, config2)) {
>             str.add(config.hostName() + ":" + config.port());
>         }
>         return StringUtils.join(str, ",");
>     }
>
>     @Test
>     public void test() throws Exception {
>         consumeDataFromkafka(0);
>
>         // kill sessions
>         //KillSession.kill(zk1, zkServer.getConnectString());
>         //KillSession.kill(zk2, zkServer.getConnectString());
>         KillSession.kill(zkConsumer, zkServer.getConnectString());
>
>         // check broker registration and consumer registration
>         ZkClient zkClient = new ZkClient(zkServer.getConnectString());
>         boolean check1 = false;
>         boolean check2 = false;
>         boolean check3 = false;
>         for (int i = 0; i < 3; ++i) {
>             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> BROKER_ID1)) {
>                 Thread.sleep(1000);
>                 continue;
>             } else {
>                 check1 = true;
>             }
>             if (!zkClient.exists(ZkUtils.BrokerIdsPath() + "/" +
> BROKER_ID2)) {
>                 Thread.sleep(1000);
>                 continue;
>             } else {
>                 check2 = true;
>             }
>             if (zkClient.getChildren(ZkUtils.ConsumersPath() + "/" + feed
> + "/ids").isEmpty()) {
>                 Thread.sleep(1000);
>                 continue;
>             } else {
>                 check3 = true;
>             }
>         }
>         assertTrue(check1 && check2 && check3);
>
>         generateDataToKafka(messageCount);
>         consumeDataFromkafka(messageCount);
>     }
>
>     public static void consumeDataFromkafka(int startNumber) throws
> IOException {
>         for (int i = 0; i < messageCount; ++i) {
>             Map<String, Object> obj = JsonMapper.getInstance().readValue(
>                     stream.iterator().next().message(),
>                     new TypeReference<Map<String, Object>>() {}
>             );
>             assertEquals(obj.get("messageId"), i + startNumber);
>             System.out.println("consumed:" + obj);
>         }
>     }
>
>     public static void createConsumer(Properties consumerProps) throws
> NoSuchFieldException, IllegalAccessException {
>         consumer = Consumer.createJavaConsumerConnector(new
> ConsumerConfig(consumerProps));
>
>         final Map<String, List<KafkaStream<byte[], byte[]>>> streams
=
> consumer.createMessageStreams(ImmutableMap.of(feed, 1));
>
>         final List<KafkaStream<byte[], byte[]>> streamList =
> streams.get(feed);
>         if (streamList == null || streamList.size() != 1) {
>             fail();
>         }
>
>         stream = streamList.get(0);
>
>         Field f = consumer.getClass().getDeclaredField("underlying");
>         f.setAccessible(true);
>         ZookeeperConsumerConnector zkConsumerConnector =
> (ZookeeperConsumerConnector) f.get(consumer);
>
>         Field fk =
> zkConsumerConnector.getClass().getDeclaredField("kafka$consumer$ZookeeperConsumerConnector$$zkClient");
>         fk.setAccessible(true);
>         ZkClient zkClient = (ZkClient) fk.get(zkConsumerConnector);
>         zkConsumer = getZk(zkClient);
>     }
> }
>
>
> Thank you
> Best, Jae
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message