flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
Date Fri, 05 Aug 2016 06:22:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408958#comment-15408958
] 

ASF GitHub Bot commented on FLINK-4035:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2231#discussion_r73646737
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
---
    @@ -0,0 +1,331 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +package org.apache.flink.streaming.connectors.kafka;
    +
    +import kafka.admin.AdminUtils;
    +import kafka.common.KafkaException;
    +import kafka.network.SocketServer;
    +import kafka.server.KafkaConfig;
    +import kafka.server.KafkaServer;
    +import kafka.utils.SystemTime$;
    +import kafka.utils.ZkUtils;
    +import org.I0Itec.zkclient.ZkClient;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.curator.test.TestingServer;
    +import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
    +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
    +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
    +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
    +import org.apache.flink.util.NetUtils;
    +import org.apache.kafka.common.protocol.SecurityProtocol;
    +import org.apache.kafka.common.requests.MetadataResponse;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.net.BindException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.UUID;
    +
    +import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +
    +/**
    + * An implementation of the KafkaServerProvider for Kafka 0.10
    + */
    +public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
    +
    +	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    +	private File tmpZkDir;
    +	private File tmpKafkaParent;
    +	private List<File> tmpKafkaDirs;
    +	private List<KafkaServer> brokers;
    +	private TestingServer zookeeper;
    +	private String zookeeperConnectionString;
    +	private String brokerConnectionString = "";
    +	private Properties standardProps;
    +	private Properties additionalServerProperties;
    +
    +	public String getBrokerConnectionString() {
    +		return brokerConnectionString;
    +	}
    +
    +	@Override
    +	public Properties getStandardProperties() {
    +		return standardProps;
    +	}
    +
    +	@Override
    +	public String getVersion() {
    +		return "0.10";
    +	}
    +
    +	@Override
    +	public List<KafkaServer> getBrokers() {
    +		return brokers;
    +	}
    +
    +	@Override
    +	public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics,
KeyedDeserializationSchema<T> readSchema, Properties props) {
    +		return new FlinkKafkaConsumer010<>(topics, readSchema, props);
    +	}
    +
    +	@Override
    +	public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T>
serSchema, Properties props, KafkaPartitioner<T> partitioner) {
    +		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema,
props, partitioner);
    +		prod.setFlushOnCheckpoint(true);
    +		return prod;
    +	}
    +
    +	@Override
    +	public void restartBroker(int leaderId) throws Exception {
    +		brokers.set(leaderId, getKafkaServer(leaderId, tmpKafkaDirs.get(leaderId)));
    +	}
    +
    +	@Override
    +	public int getLeaderToShutDown(String topic) throws Exception {
    +		ZkUtils zkUtils = getZkUtils();
    +		try {
    +			MetadataResponse.PartitionMetadata firstPart = null;
    +			do {
    +				if (firstPart != null) {
    +					LOG.info("Unable to find leader. error code {}", firstPart.error().code());
    +					// not the first try. Sleep a bit
    +					Thread.sleep(150);
    +				}
    +
    +				List<MetadataResponse.PartitionMetadata> partitionMetadata = AdminUtils.fetchTopicMetadataFromZk(topic,
zkUtils).partitionMetadata();
    +				firstPart = partitionMetadata.get(0);
    +			}
    +			while (firstPart.error().code() != 0);
    +
    +			return firstPart.leader().id();
    +		} finally {
    +			zkUtils.close();
    +		}
    +	}
    +
    +	@Override
    +	public int getBrokerId(KafkaServer server) {
    +		return server.config().brokerId();
    +	}
    +
    +	@Override
    +	public void prepare(int numKafkaServers, Properties additionalServerProperties) {
    +		this.additionalServerProperties = additionalServerProperties;
    +		File tempDir = new File(System.getProperty("java.io.tmpdir"));
    +
    +		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
    +		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
    +
    +		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
    +		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
    +
    +		tmpKafkaDirs = new ArrayList<>(numKafkaServers);
    +		for (int i = 0; i < numKafkaServers; i++) {
    +			File tmpDir = new File(tmpKafkaParent, "server-" + i);
    +			assertTrue("cannot create kafka temp dir", tmpDir.mkdir());
    +			tmpKafkaDirs.add(tmpDir);
    +		}
    +
    +		zookeeper = null;
    +		brokers = null;
    +
    +		try {
    +			LOG.info("Starting Zookeeper");
    +			zookeeper = new TestingServer(-1, tmpZkDir);
    +			zookeeperConnectionString = zookeeper.getConnectString();
    +
    +			LOG.info("Starting KafkaServer");
    +			brokers = new ArrayList<>(numKafkaServers);
    +
    +			for (int i = 0; i < numKafkaServers; i++) {
    +				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
    +
    +				SocketServer socketServer = brokers.get(i).socketServer();
    +				brokerConnectionString += hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST,
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
    +			}
    +
    +			LOG.info("ZK and KafkaServer started.");
    +		}
    +		catch (Throwable t) {
    +			t.printStackTrace();
    +			fail("Test setup failed: " + t.getMessage());
    +		}
    +
    +		standardProps = new Properties();
    +		standardProps.setProperty("zookeeper.connect", zookeeperConnectionString);
    +		standardProps.setProperty("bootstrap.servers", brokerConnectionString);
    +		standardProps.setProperty("group.id", "flink-tests");
    +		standardProps.setProperty("auto.commit.enable", "false");
    +		standardProps.setProperty("zookeeper.session.timeout.ms", "30000"); // 6 seconds is
default. Seems to be too small for travis.
    +		standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
    +		standardProps.setProperty("auto.offset.reset", "earliest"); // read from the beginning.
(earliest is kafka 0.10 value)
    +		standardProps.setProperty("fetch.message.max.bytes", "256"); // make a lot of fetches
(MESSAGES MUST BE SMALLER!)
    +	}
    +
    +	@Override
    +	public void shutdown() {
    +		for (KafkaServer broker : brokers) {
    +			if (broker != null) {
    +				broker.shutdown();
    +			}
    +		}
    +		brokers.clear();
    +
    +		if (zookeeper != null) {
    +			try {
    +				zookeeper.stop();
    +			}
    +			catch (Exception e) {
    +				LOG.warn("ZK.stop() failed", e);
    +			}
    +			zookeeper = null;
    +		}
    +
    +		// clean up the temp spaces
    +
    +		if (tmpKafkaParent != null && tmpKafkaParent.exists()) {
    +			try {
    +				FileUtils.deleteDirectory(tmpKafkaParent);
    +			}
    +			catch (Exception e) {
    +				// ignore
    +			}
    +		}
    +		if (tmpZkDir != null && tmpZkDir.exists()) {
    +			try {
    +				FileUtils.deleteDirectory(tmpZkDir);
    +			}
    +			catch (Exception e) {
    +				// ignore
    +			}
    +		}
    +	}
    +
    +	public ZkUtils getZkUtils() {
    +		ZkClient creator = new ZkClient(zookeeperConnectionString, Integer.valueOf(standardProps.getProperty("zookeeper.session.timeout.ms")),
    +				Integer.valueOf(standardProps.getProperty("zookeeper.connection.timeout.ms")), new
ZooKeeperStringSerializer());
    +		return ZkUtils.apply(creator, false);
    +	}
    +
    +	@Override
    +	public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor,
Properties topicConfig) {
    +		// create topic with one client
    +		LOG.info("Creating topic {}", topic);
    +
    +		ZkUtils zkUtils = getZkUtils();
    +		try {
    +			AdminUtils.createTopic(zkUtils, topic, numberOfPartitions, replicationFactor, topicConfig,
new kafka.admin.RackAwareMode.Enforced$());
    --- End diff --
    
    The proper usage of `RackAwareMode` here seems to be `kafka.admin.RackAwareMode.Enforced$.MODULE$`
(this is how tests in Kafka use this). IntelliJ complains that `new kafka.admin.RackAwareMode.Enforced$()`
has private access, I'm not sure why the build is passing on this though ...


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
>                 Key: FLINK-4035
>                 URL: https://issues.apache.org/jira/browse/FLINK-4035
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.0.3
>            Reporter: Elias Levy
>            Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  Published messages
now include timestamps and compressed messages now include relative offsets.  As it is now,
brokers must decompress publisher compressed messages, assign offset to them, and recompress
them, which is wasteful and makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message