[ https://issues.apache.org/jira/browse/SAMOA-40?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634947#comment-14634947 ] ASF GitHub Bot commented on SAMOA-40: ------------------------------------- Github user gdfm commented on a diff in the pull request: https://github.com/apache/incubator-samoa/pull/32#discussion_r35090697 --- Diff: samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java --- @@ -0,0 +1,279 @@ +package org.apache.samoa.streams.kafka; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.*; + +public class KafkaReader { + + protected long readOffset; + + private List m_replicaBrokers = new ArrayList(); + + public KafkaReader() { + m_replicaBrokers = new ArrayList(); + readOffset = 0L; + } + + public ArrayList run(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) { + + // find the meta data about the topic and partition we are interested in + String answer = ""; + ArrayList returnInstances = new ArrayList(); + PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + System.out.println("Can't find metadata for Topic and Partition. Exiting"); --- End diff -- Rather than printout we should use logging > Add Kafka stream reader modules to consume data from Kafka framework > -------------------------------------------------------------------- > > Key: SAMOA-40 > URL: https://issues.apache.org/jira/browse/SAMOA-40 > Project: SAMOA > Issue Type: Task > Components: Infrastructure, SAMOA-API > Environment: OS X Version 10.10.3 > Reporter: Vishal Karande > Priority: Minor > Labels: features > Original Estimate: 168h > Remaining Estimate: 168h > > Apache SAMOA is designed to process streaming data and develop streaming machine learning > algorithm. Currently, SAMOA framework supports stream data read from Arff files only. > Thus, while using SAMOA as a streaming machine learning component in real time use-cases, > writing and reading data from files is slow and inefficient. > A single Kafka broker can handle hundreds of megabytes of reads and writes per second > from thousands of clients. The ability to read data directly from Apache Kafka into SAMOA will > not only improve performance but also make SAMOA pluggable to many real time machine > learning use cases such as Internet of Things(IoT). > GOAL: > Add code that enables SAMOA to read data from Apache Kafka as a stream data. > Kafka stream reader supports following different options for streaming: > a) Topic selection - Kafka topic to read data > b) Partition selection - Kafka partition to read data > c) Batching - Number of data instances read from Kafka in one read request to Kafka > d) Configuration options - Kafka port number, seed information, time delay between two read requests > Components: > KafkaReader - Consists for APIs to read data from Kafka > KafkaStream - Stream source for SAMOA providing data read from Kafka > Dependencies for Kafka are added in pom.xml for in samoa-api component. -- This message was sent by Atlassian JIRA (v6.3.4#6332)