drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ar...@apache.org
Subject [1/2] drill git commit: DRILL-4779: Kafka storage plugin (Kamesh Bhallamudi & Anil Kumar Batchu)
Date Tue, 28 Nov 2017 10:19:49 GMT
Repository: drill
Updated Branches:
  refs/heads/master 05d8b3c2c -> d3f8da2b6


http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
new file mode 100644
index 0000000..1931898
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Resources;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
+public class KafkaMessageGenerator {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaMessageGenerator.class);
+  private Properties producerProperties = new Properties();
+
+  public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) {
+    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
+    producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
+    producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
+    producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
+    producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0);
+    producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
+    producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000);
+    producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client");
+    producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+  }
+
+  public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
+    KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties);
+    Schema.Parser parser = new Schema.Parser();
+    Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+    Random rand = new Random();
+    for (int i = 0; i < numMsg; ++i) {
+      builder.set("key1", UUID.randomUUID().toString());
+      builder.set("key2", rand.nextInt());
+      builder.set("key3", rand.nextBoolean());
+
+      List<Integer> list = Lists.newArrayList();
+      list.add(rand.nextInt(100));
+      list.add(rand.nextInt(100));
+      list.add(rand.nextInt(100));
+      builder.set("key5", list);
+
+      Map<String, Double> map = Maps.newHashMap();
+      map.put("key61", rand.nextDouble());
+      map.put("key62", rand.nextDouble());
+      builder.set("key6", map);
+
+      Record producerRecord = builder.build();
+
+      ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord);
+      producer.send(record);
+    }
+    producer.close();
+  }
+
+  public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException {
+    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);
+    Random rand = new Random();
+    try {
+      for (int i = 0; i < numMsg; ++i) {
+        JsonObject object = new JsonObject();
+        object.addProperty("key1", UUID.randomUUID().toString());
+        object.addProperty("key2", rand.nextInt());
+        object.addProperty("key3", rand.nextBoolean());
+
+        JsonArray element2 = new JsonArray();
+        element2.add(new JsonPrimitive(rand.nextInt(100)));
+        element2.add(new JsonPrimitive(rand.nextInt(100)));
+        element2.add(new JsonPrimitive(rand.nextInt(100)));
+
+        object.add("key5", element2);
+
+        JsonObject element3 = new JsonObject();
+        element3.addProperty("key61", rand.nextDouble());
+        element3.addProperty("key62", rand.nextDouble());
+        object.add("key6", element3);
+
+        ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString());
+        logger.info("Publishing message : {}", message);
+        Future<RecordMetadata> future = producer.send(message);
+        logger.info("Committed offset of the message : {}", future.get().offset());
+      }
+    } catch (Throwable th) {
+      logger.error(th.getMessage(), th);
+      throw new DrillRuntimeException(th.getMessage(), th);
+    } finally {
+      if (producer != null) {
+        producer.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
new file mode 100644
index 0000000..fb48424
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class KafkaQueriesTest extends KafkaTestBase {
+
+  @Test
+  public void testSqlQueryOnInvalidTopic() throws Exception {
+    String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.INVALID_TOPIC);
+    try {
+      testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList())
+          .build().run();
+      Assert.fail("Test passed though topic does not exist.");
+    } catch (RpcException re) {
+      Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
+    }
+  }
+
+  @Test
+  public void testResultCount() throws Exception {
+    String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.JSON_TOPIC);
+    runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @Test
+  public void testPartitionMinOffset() throws Exception {
+    // following kafka.tools.GetOffsetShell for earliest as -2
+    Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
+
+    String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, QueryConstants.JSON_TOPIC);
+    testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset")
+        .baselineValues(startOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))).go();
+  }
+
+  @Test
+  public void testPartitionMaxOffset() throws Exception {
+    // following kafka.tools.GetOffsetShell for latest as -1
+    Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1);
+
+    String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, QueryConstants.JSON_TOPIC);
+    testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset")
+        .baselineValues(endOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go();
+  }
+
+  private Map<TopicPartition, Long> fetchOffsets(int flag) {
+    KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer());
+
+    Map<TopicPartition, Long> offsetsMap = Maps.newHashMap();
+    kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC));
+    // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+    // evaluates lazily, seeking to the
+    // first/last offset in all partitions only when poll(long) or
+    // position(TopicPartition) are called
+    kafkaConsumer.poll(0);
+    Set<TopicPartition> assignments = kafkaConsumer.assignment();
+
+    try {
+      if (flag == -2) {
+        // fetch start offsets for each topicPartition
+        kafkaConsumer.seekToBeginning(assignments);
+        for (TopicPartition topicPartition : assignments) {
+          offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+        }
+      } else if (flag == -1) {
+        // fetch end offsets for each topicPartition
+        kafkaConsumer.seekToEnd(assignments);
+        for (TopicPartition topicPartition : assignments) {
+          offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
+        }
+      } else {
+        throw new RuntimeException(String.format("Unsupported flag %d", flag));
+      }
+    } finally {
+      kafkaConsumer.close();
+    }
+    return offsetsMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
new file mode 100644
index 0000000..e30f3e6
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+
+import com.google.common.collect.Maps;
+
+public class KafkaTestBase extends PlanTestBase {
+  protected static KafkaStoragePluginConfig storagePluginConfig;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Make sure this test is only running as part of the suit
+    Assume.assumeTrue(TestKafkaSuit.isRunningSuite());
+    TestKafkaSuit.initKafka();
+    initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
+  }
+
+  public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
+    final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage();
+    Map<String, String> kafkaConsumerProps = Maps.newHashMap();
+    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList());
+    kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "drill-test-consumer");
+    storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps);
+    storagePluginConfig.setEnabled(true);
+    pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true);
+    testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER,
+        "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"));
+    testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200));
+  }
+
+  public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
+    return testSqlWithResults(sql);
+  }
+
+  public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
+    List<QueryDataBatch> results = runKafkaSQLWithResults(sql);
+    printResultAndVerifyRowCount(results, expectedRowCount);
+  }
+
+  public void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount)
+      throws SchemaChangeException {
+    int rowCount = printResult(results);
+    if (expectedRowCount != -1) {
+      Assert.assertEquals(expectedRowCount, rowCount);
+    }
+  }
+
+  public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception {
+    testPhysicalPlan(query, expectedExprInPlan);
+    int actualRecordCount = testSql(query);
+    assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
+        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
+  }
+
+  @AfterClass
+  public static void tearDownKafkaTestBase() throws Exception {
+    TestKafkaSuit.tearDownCluster();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
new file mode 100644
index 0000000..aad64e3
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer<byte[], byte[]> kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+    Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+    consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+    kafkaConsumer = new KafkaConsumer<>(consumerProps);
+    subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+    if (kafkaConsumer != null) {
+      kafkaConsumer.close();
+    }
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+    MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, 1);
+    try {
+      iterator.hasNext();
+      Assert.fail("Test passed even though there are no message fetched.");
+    } catch (UserException ue) {
+      Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
+      Assert.assertTrue(ue.getMessage().contains(
+          "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
+    }
+  }
+
+  @Test
+  public void testShouldReturnTrueAsKafkaHasMessages() {
+    MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+    Assert.assertTrue("Message iterator returned false though there are messages in Kafka", iterator.hasNext());
+  }
+
+  @Test
+  public void testShouldReturnMessage1() {
+    MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+    // Calling hasNext makes only one poll to Kafka which fetches only 4 messages.
+    // so fifth operation on iterator is expected to fail.
+    iterator.hasNext();
+    Assert.assertNotNull(iterator.next());
+    Assert.assertNotNull(iterator.next());
+    Assert.assertNotNull(iterator.next());
+    Assert.assertNotNull(iterator.next());
+    try {
+      iterator.next();
+      Assert.fail("Kafak fetched more messages than configured.");
+    } catch (NoSuchElementException nse) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testShouldReturnMessage2() {
+    MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1));
+    int messageCount = 0;
+    while (iterator.hasNext()) {
+      ConsumerRecord<byte[], byte[]> consumerRecord = iterator.next();
+      Assert.assertNotNull(consumerRecord);
+      ++messageCount;
+    }
+    Assert.assertEquals(TestKafkaSuit.NUM_JSON_MSG, messageCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
new file mode 100644
index 0000000..ff58f7e
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public interface QueryConstants {
+
+  // Kafka Server Prop Constants
+  public static final String BROKER_DELIM = ",";
+  public final String LOCAL_HOST = "127.0.0.1";
+
+  // ZK
+  public final static String ZK_TMP = "zk_tmp";
+  public final static int TICK_TIME = 500;
+  public final static int MAX_CLIENT_CONNECTIONS = 100;
+
+  public static final String JSON_TOPIC = "drill-json-topic";
+  public static final String AVRO_TOPIC = "drill-avro-topic";
+  public static final String INVALID_TOPIC = "invalid-topic";
+
+  // Queries
+  public static final String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
+  public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`";
+  public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`";
+  public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`";
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
new file mode 100644
index 0000000..178c809
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.drill.exec.ZookeeperTestUtil;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.security.JaasUtils;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+@RunWith(Suite.class)
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class })
+public class TestKafkaSuit {
+  private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
+  private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
+
+  public static EmbeddedKafkaCluster embeddedKafkaCluster;
+  private static ZkClient zkClient;
+
+  private static volatile AtomicInteger initCount = new AtomicInteger(0);
+  static final int NUM_JSON_MSG = 10;
+  static final int CONN_TIMEOUT = 8 * 1000;
+  static final int SESSION_TIMEOUT = 10 * 1000;
+
+  static String kafkaBroker;
+  private static volatile boolean runningSuite = false;
+
+  @BeforeClass
+  public static void initKafka() throws Exception {
+    synchronized (TestKafkaSuit.class) {
+      if (initCount.get() == 0) {
+        ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
+        System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
+        embeddedKafkaCluster = new EmbeddedKafkaCluster();
+        Properties topicProps = new Properties();
+        zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
+        AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$);
+
+        org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils
+            .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils);
+        logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+
+        KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
+            StringSerializer.class);
+        generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG);
+      }
+      initCount.incrementAndGet();
+      runningSuite = true;
+    }
+    logger.info("Initialized Embedded Zookeeper and Kafka");
+  }
+
+  public static boolean isRunningSuite() {
+    return runningSuite;
+  }
+
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    synchronized (TestKafkaSuit.class) {
+      if (initCount.decrementAndGet() == 0) {
+        if (zkClient != null) {
+          zkClient.close();
+        }
+        if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
+          embeddedKafkaCluster.shutDownCluster();
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..319c66c
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.drill.exec.ZookeeperHelper;
+import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.drill.exec.store.kafka.QueryConstants;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+public class EmbeddedKafkaCluster implements QueryConstants {
+  private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+  private List<KafkaServerStartable> brokers;
+  private final ZookeeperHelper zkHelper;
+  private final Properties props;
+
+  public EmbeddedKafkaCluster() throws IOException {
+    this(new Properties());
+  }
+
+  public EmbeddedKafkaCluster(Properties props) throws IOException {
+    this(props, 1);
+  }
+
+  public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException {
+    this.props = new Properties();
+    props.putAll(basePorps);
+    this.zkHelper = new ZookeeperHelper();
+    zkHelper.startZookeeper(1);
+    this.brokers = new ArrayList<>(numberOfBrokers);
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < numberOfBrokers; ++i) {
+      if (i != 0) {
+        sb.append(BROKER_DELIM);
+      }
+      int ephemeralBrokerPort = getEphemeralPort();
+      sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort);
+      addBroker(props, i, ephemeralBrokerPort);
+    }
+
+    this.props.put("metadata.broker.list", sb.toString());
+    this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
+    logger.info("Initialized Kafka Server");
+  }
+
+  private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
+    Properties properties = new Properties();
+    properties.putAll(props);
+    properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
+    properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
+    properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
+    properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
+    properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
+    properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE);
+    properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
+    properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
+    properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST));
+    properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST));
+    properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
+    properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE);
+    properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
+    properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
+    brokers.add(getBroker(properties));
+  }
+
+  private static KafkaServerStartable getBroker(Properties properties) {
+    KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig(properties));
+    broker.startup();
+    return broker;
+  }
+
+  public void shutDownCluster() throws IOException {
+    // set Kafka log level to ERROR
+    Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel();
+    LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR);
+
+    for (KafkaServerStartable broker : brokers) {
+      broker.shutdown();
+    }
+
+    // revert back the level
+    LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level);
+    zkHelper.stopZookeeper();
+  }
+
+  public void shutDownBroker(int brokerId) {
+    for (KafkaServerStartable broker : brokers) {
+      if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
+        broker.shutdown();
+        return;
+      }
+    }
+  }
+
+  public Properties getProps() {
+    Properties tmpProps = new Properties();
+    tmpProps.putAll(this.props);
+    return tmpProps;
+  }
+
+  public List<KafkaServerStartable> getBrokers() {
+    return brokers;
+  }
+
+  public void setBrokers(List<KafkaServerStartable> brokers) {
+    this.brokers = brokers;
+  }
+
+  public ZookeeperHelper getZkServer() {
+    return zkHelper;
+  }
+
+  public String getKafkaBrokerList() {
+    StringBuilder sb = new StringBuilder();
+    for (KafkaServerStartable broker : brokers) {
+      KafkaConfig serverConfig = broker.serverConfig();
+      sb.append(serverConfig.hostName() + ":" + serverConfig.port());
+      sb.append(",");
+    }
+    return sb.toString().substring(0, sb.toString().length() - 1);
+  }
+
+  private int getEphemeralPort() throws IOException {
+    try (ServerSocket socket = new ServerSocket(0)) {
+      return socket.getLocalPort();
+    }
+  }
+
+  private File getTemporaryDir() {
+    File file = new File(System.getProperty("java.io.tmpdir"), ZK_TMP + System.nanoTime());
+    if (!file.mkdir()) {
+      logger.error("Failed to create temp Dir");
+      throw new RuntimeException("Failed to create temp Dir");
+    }
+    return file;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
new file mode 100644
index 0000000..a3cfcf7
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageReaderFactoryTest {
+
+  @Test
+  public void testShouldThrowExceptionAsMessageReaderIsNull() {
+    try {
+      MessageReaderFactory.getMessageReader(null);
+      Assert.fail("Message reader initialization succeeded even though it is null");
+    } catch (UserException ue) {
+      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertTrue(ue.getMessage().contains(
+          "VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'"));
+    }
+  }
+
+  @Test
+  public void testShouldThrowExceptionAsMessageReaderHasNotImplementedMessageReaderIntf() {
+    try {
+      MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName());
+      Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface");
+    } catch (UserException ue) {
+      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertTrue(ue.getMessage().contains(
+          "VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'"));
+    }
+  }
+
+  @Test
+  public void testShouldThrowExceptionAsNoClassFound() {
+    try {
+      MessageReaderFactory.getMessageReader("a.b.c.d");
+      Assert.fail("Message reader initialization succeeded even though class does not exist");
+    } catch (UserException ue) {
+      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d"));
+    }
+  }
+
+  @Test
+  public void testShouldReturnJsonMessageReaderInstance() {
+    MessageReader messageReader = MessageReaderFactory.getMessageReader(JsonMessageReader.class.getName());
+    Assert.assertTrue(messageReader instanceof JsonMessageReader);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/resources/login.conf
----------------------------------------------------------------------
diff --git a/contrib/storage-kafka/src/test/resources/login.conf b/contrib/storage-kafka/src/test/resources/login.conf
new file mode 100644
index 0000000..0916120
--- /dev/null
+++ b/contrib/storage-kafka/src/test/resources/login.conf
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * simple login, just get OS creds
+ */
+hadoop_simple {
+  org.apache.hadoop.security.login.GenericOSLoginModule required;
+  org.apache.hadoop.security.login.HadoopLoginModule required;
+};
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 9bb21d6..06981e0 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,11 @@
           <artifactId>drill-gis</artifactId>
           <version>${project.version}</version>
         </dependency>
+	<dependency>
+          <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-storage-kafka</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index faa2e72..b6087eb 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -100,6 +100,7 @@
         <include>org.apache.drill.contrib:drill-jdbc-storage</include>
         <include>org.apache.drill.contrib:drill-kudu-storage</include>
         <include>org.apache.drill.contrib:drill-gis</include>
+        <include>org.apache.drill.contrib:drill-storage-kafka</include>
       </includes>
       <excludes>
         <exclude>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar:tests</exclude>

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 17926af..89b4b48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -308,6 +308,18 @@ public final class ExecConstants {
 
   public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type");
 
+  // Kafka plugin related options.
+  public static final String KAFKA_ALL_TEXT_MODE = "store.kafka.all_text_mode";
+  public static final OptionValidator KAFKA_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(KAFKA_ALL_TEXT_MODE);
+  public static final String KAFKA_READER_READ_NUMBERS_AS_DOUBLE = "store.kafka.read_numbers_as_double";
+  public static final OptionValidator KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(
+      KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
+  public static final String KAFKA_RECORD_READER = "store.kafka.record.reader";
+  public static final OptionValidator KAFKA_RECORD_READER_VALIDATOR = new StringValidator(KAFKA_RECORD_READER);
+  public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout";
+  public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT,
+      Long.MAX_VALUE);
+
   // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
   // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
   public static final String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers";

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1c45547..a1ddb30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -152,6 +152,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       new OptionDefinition(ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR),
       new OptionDefinition(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
       new OptionDefinition(ExecConstants.MONGO_BSON_RECORD_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_READER_ALL_TEXT_MODE_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
       new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
       new OptionDefinition(ExecConstants.AFFINITY_FACTOR),

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index a66fce0..f5e85a3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -531,6 +531,10 @@ drill.exec.options: {
     store.parquet.writer.use_single_fs_block: false,
     store.partition.hash_distribute: false,
     store.text.estimated_row_size_bytes: 100.0,
+    store.kafka.all_text_mode: false,
+    store.kafka.read_numbers_as_double: false,
+    store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
+    store.kafka.poll.timeout: 200,
     web.logs.max_lines: 10000,
     window.enable: true,
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index 9a3b1d9..51cdab7 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -517,6 +517,10 @@ public final class UserBitShared {
      * <code>PCAP_SUB_SCAN = 37;</code>
      */
     PCAP_SUB_SCAN(37, 37),
+    /**
+    * <code>KAFKA_SUB_SCAN = 38;</code>
+    */
+    KAFKA_SUB_SCAN(38, 38),
     ;
 
     /**
@@ -671,6 +675,10 @@ public final class UserBitShared {
      * <code>PCAP_SUB_SCAN = 37;</code>
      */
     public static final int PCAP_SUB_SCAN_VALUE = 37;
+    /**
+     * <code>KAFKA_SUB_SCAN = 38;</code>
+     */
+    public static final int KAFKA_SUB_SCAN_VALUE = 38;
 
 
     public final int getNumber() { return value; }
@@ -715,6 +723,7 @@ public final class UserBitShared {
         case 35: return NESTED_LOOP_JOIN;
         case 36: return AVRO_SUB_SCAN;
         case 37: return PCAP_SUB_SCAN;
+        case 38: return KAFKA_SUB_SCAN;
         default: return null;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
index a795f55..8ad38a5 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java
@@ -59,7 +59,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
     WINDOW(34),
     NESTED_LOOP_JOIN(35),
     AVRO_SUB_SCAN(36),
-    PCAP_SUB_SCAN(37);
+    PCAP_SUB_SCAN(37),
+    KAFKA_SUB_SCAN(38);
     
     public final int number;
     
@@ -115,6 +116,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO
             case 35: return NESTED_LOOP_JOIN;
             case 36: return AVRO_SUB_SCAN;
             case 37: return PCAP_SUB_SCAN;
+            case 38: return KAFKA_SUB_SCAN;
             default: return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 52b3c63..086b98a 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -309,6 +309,7 @@ enum CoreOperatorType {
   NESTED_LOOP_JOIN = 35;
   AVRO_SUB_SCAN = 36;
   PCAP_SUB_SCAN = 37;
+  KAFKA_SUB_SCAN = 38;
 }
 
 /* Registry that contains list of jars, each jar contains its name and list of function signatures.


Mime
View raw message