Repository: drill
Updated Branches:
refs/heads/master 05d8b3c2c -> d3f8da2b6
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
new file mode 100644
index 0000000..1931898
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Resources;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
+public class KafkaMessageGenerator {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaMessageGenerator.class);
+ private Properties producerProperties = new Properties();
+
+ public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) {
+ producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+ producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
+ producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+ producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
+ producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+ producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
+ producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client");
+ producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+ }
+
+ public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
+ KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties);
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ Random rand = new Random();
+ for (int i = 0; i < numMsg; ++i) {
+ builder.set("key1", UUID.randomUUID().toString());
+ builder.set("key2", rand.nextInt());
+ builder.set("key3", rand.nextBoolean());
+
+ List<Integer> list = Lists.newArrayList();
+ list.add(rand.nextInt(100));
+ list.add(rand.nextInt(100));
+ list.add(rand.nextInt(100));
+ builder.set("key5", list);
+
+ Map<String, Double> map = Maps.newHashMap();
+ map.put("key61", rand.nextDouble());
+ map.put("key62", rand.nextDouble());
+ builder.set("key6", map);
+
+ Record producerRecord = builder.build();
+
+ ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord);
+ producer.send(record);
+ }
+ producer.close();
+ }
+
+ public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException {
+ KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);
+ Random rand = new Random();
+ try {
+ for (int i = 0; i < numMsg; ++i) {
+ JsonObject object = new JsonObject();
+ object.addProperty("key1", UUID.randomUUID().toString());
+ object.addProperty("key2", rand.nextInt());
+ object.addProperty("key3", rand.nextBoolean());
+
+ JsonArray element2 = new JsonArray();
+ element2.add(new JsonPrimitive(rand.nextInt(100)));
+ element2.add(new JsonPrimitive(rand.nextInt(100)));
+ element2.add(new JsonPrimitive(rand.nextInt(100)));
+
+ object.add("key5", element2);
+
+ JsonObject element3 = new JsonObject();
+ element3.addProperty("key61", rand.nextDouble());
+ element3.addProperty("key62", rand.nextDouble());
+ object.add("key6", element3);
+
+ ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString());
+ logger.info("Publishing message : {}", message);
+ Future<RecordMetadata> future = producer.send(message);
+ logger.info("Committed offset of the message : {}", future.get().offset());
+ }
+ } catch (Throwable th) {
+ logger.error(th.getMessage(), th);
+ throw new DrillRuntimeException(th.getMessage(), th);
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
new file mode 100644
index 0000000..fb48424
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class KafkaQueriesTest extends KafkaTestBase {
+
+ @Test
+ public void testSqlQueryOnInvalidTopic() throws Exception {
+ String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.INVALID_TOPIC);
+ try {
+ testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList())
+ .build().run();
+ Assert.fail("Test passed though topic does not exist.");
+ } catch (RpcException re) {
+ Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
+ }
+ }
+
+ @Test
+ public void testResultCount() throws Exception {
+ String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.JSON_TOPIC);
+ runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
+ }
+
+ @Test
+ public void testPartitionMinOffset() throws Exception {
+ // following kafka.tools.GetOffsetShell for earliest as -2
+ Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
+
+ String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, QueryConstants.JSON_TOPIC);
+ testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset")
+ .baselineValues(startOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))).go();
+ }
+
+ @Test
+ public void testPartitionMaxOffset() throws Exception {
+ // following kafka.tools.GetOffsetShell for latest as -1
+ Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1);
+
+ String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, QueryConstants.JSON_TOPIC);
+ testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset")
+ .baselineValues(endOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go();
+ }
+
+ private Map<TopicPartition, Long> fetchOffsets(int flag) {
+ KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
+
+ Map<TopicPartition, Long> offsetsMap = Maps.newHashMap();
+ kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC));
+ // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+ // evaluates lazily, seeking to the
+ // first/last offset in all partitions only when poll(long) or
+ // position(TopicPartition) are called
+ kafkaConsumer.poll(0);
+ Set<TopicPartition> assignments = kafkaConsumer.assignment();
+
+ try {
+ if (flag == -2) {
+ // fetch start offsets for each topicPartition
+ kafkaConsumer.seekToBeginning(assignments);
+ for (TopicPartition topicPartition : assignments) {
+ offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+ }
+ } else if (flag == -1) {
+ // fetch end offsets for each topicPartition
+ kafkaConsumer.seekToEnd(assignments);
+ for (TopicPartition topicPartition : assignments) {
+ offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+ }
+ } else {
+ throw new RuntimeException(String.format("Unsupported flag %d", flag));
+ }
+ } finally {
+ kafkaConsumer.close();
+ }
+ return offsetsMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..e30f3e6
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class KafkaTestBase extends PlanTestBase {
+ protected static KafkaStoragePluginConfig storagePluginConfig;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Make sure this test is only running as part of the suit
+ Assume.assumeTrue(TestKafkaSuit.isRunningSuite());
+ TestKafkaSuit.initKafka();
+ initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
+ }
+
+ public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
+ final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+ Map<String, String> kafkaConsumerProps = Maps.newHashMap();
+ kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList());
+ kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "drill-test-consumer");
+ storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
+ storagePluginConfig.setEnabled(true);
+ pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true);
+ testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER,
+ "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
+ testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200));
+ }
+
+ public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
+ return testSqlWithResults(sql);
+ }
+
+ public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
+ List<QueryDataBatch> results = runKafkaSQLWithResults(sql);
+ printResultAndVerifyRowCount(results, expectedRowCount);
+ }
+
+ public void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount)
+ throws SchemaChangeException {
+ int rowCount = printResult(results);
+ if (expectedRowCount != -1) {
+ Assert.assertEquals(expectedRowCount, rowCount);
+ }
+ }
+
+ public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception {
+ testPhysicalPlan(query, expectedExprInPlan);
+ int actualRecordCount = testSql(query);
+ assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
+ expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+ }
+
+ @AfterClass
+ public static void tearDownKafkaTestBase() throws Exception {
+ TestKafkaSuit.tearDownCluster();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
new file mode 100644
index 0000000..aad64e3
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+ private KafkaConsumer<byte[], byte[]> kafkaConsumer;
+ private KafkaSubScanSpec subScanSpec;
+
+ @Before
+ public void setUp() {
+ Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+ kafkaConsumer = new KafkaConsumer<>(consumerProps);
+ subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG);
+ }
+
+ @After
+ public void cleanUp() {
+ if (kafkaConsumer != null) {
+ kafkaConsumer.close();
+ }
+ }
+
+ @Test
+ public void testWhenPollTimeOutIsTooLess() {
+ MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, 1);
+ try {
+ iterator.hasNext();
+ Assert.fail("Test passed even though there are no message fetched.");
+ } catch (UserException ue) {
+ Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
+ Assert.assertTrue(ue.getMessage().contains(
+ "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
+ }
+ }
+
+ @Test
+ public void testShouldReturnTrueAsKafkaHasMessages() {
+ MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+ Assert.assertTrue("Message iterator returned false though there are messages in Kafka", iterator.hasNext());
+ }
+
+ @Test
+ public void testShouldReturnMessage1() {
+ MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+ // Calling hasNext makes only one poll to Kafka which fetches only 4 messages.
+ // so fifth operation on iterator is expected to fail.
+ iterator.hasNext();
+ Assert.assertNotNull(iterator.next());
+ Assert.assertNotNull(iterator.next());
+ Assert.assertNotNull(iterator.next());
+ Assert.assertNotNull(iterator.next());
+ try {
+ iterator.next();
+ Assert.fail("Kafak fetched more messages than configured.");
+ } catch (NoSuchElementException nse) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testShouldReturnMessage2() {
+ MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+ int messageCount = 0;
+ while (iterator.hasNext()) {
+ ConsumerRecord<byte[], byte[]> consumerRecord = iterator.next();
+ Assert.assertNotNull(consumerRecord);
+ ++messageCount;
+ }
+ Assert.assertEquals(TestKafkaSuit.NUM_JSON_MSG, messageCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
new file mode 100644
index 0000000..ff58f7e
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public interface QueryConstants {
+
+ // Kafka Server Prop Constants
+ public static final String BROKER_DELIM = ",";
+ public final String LOCAL_HOST = "127.0.0.1";
+
+ // ZK
+ public final static String ZK_TMP = "zk_tmp";
+ public final static int TICK_TIME = 500;
+ public final static int MAX_CLIENT_CONNECTIONS = 100;
+
+ public static final String JSON_TOPIC = "drill-json-topic";
+ public static final String AVRO_TOPIC = "drill-avro-topic";
+ public static final String INVALID_TOPIC = "invalid-topic";
+
+ // Queries
+ public static final String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
+ public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`";
+ public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`";
+ public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`";
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
new file mode 100644
index 0000000..178c809
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.drill.exec.ZookeeperTestUtil;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.security.JaasUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+@RunWith(Suite.class)
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class })
+public class TestKafkaSuit {
+ private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
+ private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
+
+ public static EmbeddedKafkaCluster embeddedKafkaCluster;
+ private static ZkClient zkClient;
+
+ private static volatile AtomicInteger initCount = new AtomicInteger(0);
+ static final int NUM_JSON_MSG = 10;
+ static final int CONN_TIMEOUT = 8 * 1000;
+ static final int SESSION_TIMEOUT = 10 * 1000;
+
+ static String kafkaBroker;
+ private static volatile boolean runningSuite = false;
+
+ @BeforeClass
+ public static void initKafka() throws Exception {
+ synchronized (TestKafkaSuit.class) {
+ if (initCount.get() == 0) {
+ ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
+ System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
+ embeddedKafkaCluster = new EmbeddedKafkaCluster();
+ Properties topicProps = new Properties();
+ zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
+ ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
+ AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$);
+
+ org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils
+ .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils);
+ logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
+ StringSerializer.class);
+ generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG);
+ }
+ initCount.incrementAndGet();
+ runningSuite = true;
+ }
+ logger.info("Initialized Embedded Zookeeper and Kafka");
+ }
+
+ public static boolean isRunningSuite() {
+ return runningSuite;
+ }
+
+ @AfterClass
+ public static void tearDownCluster() throws Exception {
+ synchronized (TestKafkaSuit.class) {
+ if (initCount.decrementAndGet() == 0) {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
+ embeddedKafkaCluster.shutDownCluster();
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..319c66c
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.drill.exec.store.kafka.QueryConstants;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+public class EmbeddedKafkaCluster implements QueryConstants {
+ private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+ private List<KafkaServerStartable> brokers;
+ private final ZookeeperHelper zkHelper;
+ private final Properties props;
+
+ public EmbeddedKafkaCluster() throws IOException {
+ this(new Properties());
+ }
+
+ public EmbeddedKafkaCluster(Properties props) throws IOException {
+ this(props, 1);
+ }
+
+ public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException {
+ this.props = new Properties();
+ props.putAll(basePorps);
+ this.zkHelper = new ZookeeperHelper();
+ zkHelper.startZookeeper(1);
+ this.brokers = new ArrayList<>(numberOfBrokers);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numberOfBrokers; ++i) {
+ if (i != 0) {
+ sb.append(BROKER_DELIM);
+ }
+ int ephemeralBrokerPort = getEphemeralPort();
+ sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort);
+ addBroker(props, i, ephemeralBrokerPort);
+ }
+
+ this.props.put("metadata.broker.list", sb.toString());
+ this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
+ logger.info("Initialized Kafka Server");
+ }
+
+ private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
+ Properties properties = new Properties();
+ properties.putAll(props);
+ properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
+ properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
+ properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
+ properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
+ properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
+ properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE);
+ properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
+ properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
+ properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST));
+ properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST));
+ properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
+ properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE);
+ properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
+ properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
+ brokers.add(getBroker(properties));
+ }
+
+ private static KafkaServerStartable getBroker(Properties properties) {
+ KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig(properties));
+ broker.startup();
+ return broker;
+ }
+
+ public void shutDownCluster() throws IOException {
+ // set Kafka log level to ERROR
+ Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel();
+ LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR);
+
+ for (KafkaServerStartable broker : brokers) {
+ broker.shutdown();
+ }
+
+ // revert back the level
+ LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level);
+ zkHelper.stopZookeeper();
+ }
+
+ public void shutDownBroker(int brokerId) {
+ for (KafkaServerStartable broker : brokers) {
+ if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
+ broker.shutdown();
+ return;
+ }
+ }
+ }
+
+ public Properties getProps() {
+ Properties tmpProps = new Properties();
+ tmpProps.putAll(this.props);
+ return tmpProps;
+ }
+
+ public List<KafkaServerStartable> getBrokers() {
+ return brokers;
+ }
+
+ public void setBrokers(List<KafkaServerStartable> brokers) {
+ this.brokers = brokers;
+ }
+
+ public ZookeeperHelper getZkServer() {
+ return zkHelper;
+ }
+
+ public String getKafkaBrokerList() {
+ StringBuilder sb = new StringBuilder();
+ for (KafkaServerStartable broker : brokers) {
+ KafkaConfig serverConfig = broker.serverConfig();
+ sb.append(serverConfig.hostName() + ":" + serverConfig.port());
+ sb.append(",");
+ }
+ return sb.toString().substring(0, sb.toString().length() - 1);
+ }
+
+ private int getEphemeralPort() throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
+
+ private File getTemporaryDir() {
+ File file = new File(System.getProperty("java.io.tmpdir"), ZK_TMP + System.nanoTime());
+ if (!file.mkdir()) {
+ logger.error("Failed to create temp Dir");
+ throw new RuntimeException("Failed to create temp Dir");
+ }
+ return file;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
new file mode 100644
index 0000000..a3cfcf7
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageReaderFactoryTest {
+
+ @Test
+ public void testShouldThrowExceptionAsMessageReaderIsNull() {
+ try {
+ MessageReaderFactory.getMessageReader(null);
+ Assert.fail("Message reader initialization succeeded even though it is null");
+ } catch (UserException ue) {
+ Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertTrue(ue.getMessage().contains(
+ "VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'"));
+ }
+ }
+
+ @Test
+ public void testShouldThrowExceptionAsMessageReaderHasNotImplementedMessageReaderIntf() {
+ try {
+ MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName());
+ Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface");
+ } catch (UserException ue) {
+ Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertTrue(ue.getMessage().contains(
+ "VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'"));
+ }
+ }
+
+ @Test
+ public void testShouldThrowExceptionAsNoClassFound() {
+ try {
+ MessageReaderFactory.getMessageReader("a.b.c.d");
+ Assert.fail("Message reader initialization succeeded even though class does not exist");
+ } catch (UserException ue) {
+ Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d"));
+ }
+ }
+
+ @Test
+ public void testShouldReturnJsonMessageReaderInstance() {
+ MessageReader messageReader = MessageReaderFactory.getMessageReader(JsonMessageReader.class.getName());
+ Assert.assertTrue(messageReader instanceof JsonMessageReader);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/resources/login.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/resources/login.conf b/contrib/storage-kafka/src/test/resources/login.conf
new file mode 100644
index 0000000..0916120
--- /dev/null
+++ b/contrib/storage-kafka/src/test/resources/login.conf
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * simple login, just get OS creds
+ */
+hadoop_simple {
+ org.apache.hadoop.security.login.GenericOSLoginModule required;
+ org.apache.hadoop.security.login.HadoopLoginModule required;
+};
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 9bb21d6..06981e0 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,11 @@
<artifactId>drill-gis</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-storage-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index faa2e72..b6087eb 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -100,6 +100,7 @@
<include>org.apache.drill.contrib:drill-jdbc-storage</include>
<include>org.apache.drill.contrib:drill-kudu-storage</include>
<include>org.apache.drill.contrib:drill-gis</include>
+ <include>org.apache.drill.contrib:drill-storage-kafka</include>
</includes>
<excludes>
<exclude>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar:tests</exclude>
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 17926af..89b4b48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -308,6 +308,18 @@ public final class ExecConstants {
public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type");
+ // Kafka plugin related options.
+ public static final String KAFKA_ALL_TEXT_MODE = "store.kafka.all_text_mode";
+ public static final OptionValidator KAFKA_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(KAFKA_ALL_TEXT_MODE);
+ public static final String KAFKA_READER_READ_NUMBERS_AS_DOUBLE = "store.kafka.read_numbers_as_double";
+ public static final OptionValidator KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(
+ KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
+ public static final String KAFKA_RECORD_READER = "store.kafka.record.reader";
+ public static final OptionValidator KAFKA_RECORD_READER_VALIDATOR = new StringValidator(KAFKA_RECORD_READER);
+ public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout";
+ public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT,
+ Long.MAX_VALUE);
+
// TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
// in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
public static final String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1c45547..a1ddb30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -152,6 +152,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR),
new OptionDefinition(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
new OptionDefinition(ExecConstants.MONGO_BSON_RECORD_READER_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_READER_ALL_TEXT_MODE_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a66fce0..f5e85a3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -531,6 +531,10 @@ drill.exec.options: {
store.parquet.writer.use_single_fs_block: false,
store.partition.hash_distribute: false,
store.text.estimated_row_size_bytes: 100.0,
+ store.kafka.all_text_mode: false,
+ store.kafka.read_numbers_as_double: false,
+ store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
+ store.kafka.poll.timeout: 200,
web.logs.max_lines: 10000,
window.enable: true,
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 9a3b1d9..51cdab7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -517,6 +517,10 @@ public final class UserBitShared {
* <code>PCAP_SUB_SCAN = 37;</code>
*/
PCAP_SUB_SCAN(37, 37),
+ /**
+ * <code>KAFKA_SUB_SCAN = 38;</code>
+ */
+ KAFKA_SUB_SCAN(38, 38),
;
/**
@@ -671,6 +675,10 @@ public final class UserBitShared {
* <code>PCAP_SUB_SCAN = 37;</code>
*/
public static final int PCAP_SUB_SCAN_VALUE = 37;
+ /**
+ * <code>KAFKA_SUB_SCAN = 38;</code>
+ */
+ public static final int KAFKA_SUB_SCAN_VALUE = 38;
public final int getNumber() { return value; }
@@ -715,6 +723,7 @@ public final class UserBitShared {
case 35: return NESTED_LOOP_JOIN;
case 36: return AVRO_SUB_SCAN;
case 37: return PCAP_SUB_SCAN;
+ case 38: return KAFKA_SUB_SCAN;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index a795f55..8ad38a5 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -59,7 +59,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
WINDOW(34),
NESTED_LOOP_JOIN(35),
AVRO_SUB_SCAN(36),
- PCAP_SUB_SCAN(37);
+ PCAP_SUB_SCAN(37),
+ KAFKA_SUB_SCAN(38);
public final int number;
@@ -115,6 +116,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
case 35: return NESTED_LOOP_JOIN;
case 36: return AVRO_SUB_SCAN;
case 37: return PCAP_SUB_SCAN;
+ case 38: return KAFKA_SUB_SCAN;
default: return null;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 52b3c63..086b98a 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -309,6 +309,7 @@ enum CoreOperatorType {
NESTED_LOOP_JOIN = 35;
AVRO_SUB_SCAN = 36;
PCAP_SUB_SCAN = 37;
+ KAFKA_SUB_SCAN = 38;
}
/* Registry that contains list of jars, each jar contains its name and list of function signatures.
|