samoa-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SAMOA-40) Add Kafka stream reader modules to consume data from Kafka framework
Date Fri, 31 Jul 2015 18:37:04 GMT

    [ https://issues.apache.org/jira/browse/SAMOA-40?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14649622#comment-14649622
] 

ASF GitHub Bot commented on SAMOA-40:
-------------------------------------

Github user karande commented on a diff in the pull request:

    https://github.com/apache/incubator-samoa/pull/32#discussion_r36002310
  
    --- Diff: samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java ---
    @@ -0,0 +1,279 @@
    +package org.apache.samoa.streams.kafka;
    +
    +/*
    + * #%L
    + * SAMOA
    + * %%
    + * Copyright (C) 2014 - 2015 Apache Software Foundation
    + * %%
    + * Licensed under the Apache License, Version 2.0 (the "License");
    + * you may not use this file except in compliance with the License.
    + * You may obtain a copy of the License at
    + * 
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + * #L%
    + */
    +
    +import kafka.api.FetchRequest;
    +import kafka.api.FetchRequestBuilder;
    +import kafka.api.PartitionOffsetRequestInfo;
    +import kafka.common.ErrorMapping;
    +import kafka.common.TopicAndPartition;
    +import kafka.javaapi.*;
    +import kafka.javaapi.consumer.SimpleConsumer;
    +import kafka.message.MessageAndOffset;
    +
    +import java.io.UnsupportedEncodingException;
    +import java.nio.ByteBuffer;
    +import java.util.*;
    +
    +public class KafkaReader {
    +
    +    protected long readOffset;
    +
    +    private List<String> m_replicaBrokers = new ArrayList<String>();
    +
    +    public KafkaReader() {
    +        m_replicaBrokers = new ArrayList<String>();
    +        readOffset = 0L;
    +    }
    +
    +    public ArrayList<String> run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) {
    +
    +        // find the meta data about the topic and partition we are interested in
    +        String answer = "";
    +        ArrayList<String> returnInstances = new ArrayList<String>();
    +        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
    +        if (metadata == null) {
    +            System.out.println("Can't find metadata for Topic and Partition. Exiting");
    +            return null;
    +        }
    +        if (metadata.leader() == null) {
    +            System.out.println("Can't find Leader for Topic and Partition. Exiting");
    +            return null;
    +        }
    +        String leadBroker = metadata.leader().host();
    +        String clientName = "Client_" + a_topic + "_" + a_partition;
    +
    +        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 *
1024, clientName);
    +        //long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
clientName);
    +        //readOffset = 0L;
    +        int numErrors = 0;
    +        while (a_maxReads > 0) {
    +            if (consumer == null) {
    +                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,
clientName);
    +            }
    +            /**
    +             * Reading data
    +             */
    +            FetchRequest req = new FetchRequestBuilder()
    +                    .clientId(clientName)
    +                    .addFetch(a_topic, a_partition, readOffset, 100000)
    +                    .build();
    +            FetchResponse fetchResponse = null;
    +
    +            try {
    +                fetchResponse = consumer.fetch(req);
    +            } catch (Exception e) {
    +
    +            }
    +
    +            /**
    +             * SimpleConsumer does not handle lead broker failures, you have to handle
it
    +             *  once the fetch returns an error, we log the reason, close the consumer
then try to figure
    +             *  out who the new leader is
    +             */
    +            if (fetchResponse.hasError()) {
    +                numErrors++;
    +                // Something went wrong!
    +                short code = fetchResponse.errorCode(a_topic, a_partition);
    +                System.out.println("Error fetching data from the Broker:" + leadBroker
+ " Reason: " + code);
    +                if (numErrors > 5) break;
    +                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
    +                    // We asked for an invalid offset. For simple case ask for the last
element to reset
    +                    //readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
clientName);
    +                    continue;
    +                }
    +                consumer.close();
    +                consumer = null;
    +                try {
    +                    leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
    +                } catch (Exception e) {
    +                    // TODO Auto-generated catch block
    +                    e.printStackTrace();
    +                }
    +                continue;
    +            }
    +            //End Error handling
    +
    +            // Reading data cont.
    +            numErrors = 0;
    +
    +            long numRead = 0;
    +            Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator();
    +            MessageAndOffset messageAndOffset = null;
    +
    +            try {
    +                messageAndOffset = (MessageAndOffset) it.next();
    +            } catch (Exception e) {
    +                return null;
    +            }
    +
    +            //for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,
a_partition)) {
    +            long currentOffset = messageAndOffset.offset();
    +            if (currentOffset < readOffset) {
    +                System.out.println("Found an old offset: " + currentOffset + " Expecting:
" + readOffset);
    +                continue;
    +            }
    +            readOffset = messageAndOffset.nextOffset();
    +            ByteBuffer payload = messageAndOffset.message().payload();
    +
    +            byte[] bytes = new byte[payload.limit()];
    +            payload.get(bytes);
    +            try {
    +                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " +
new String(bytes, "UTF-8"));
    +                answer = String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes,
"UTF-8");
    +                returnInstances.add(answer);
    +            } catch (UnsupportedEncodingException e) {
    +                e.printStackTrace();
    +            }
    +
    +            numRead++;
    +            a_maxReads--;
    +            //  break;
    +            //}
    +
    +            if (numRead == 0) {
    +                try {
    +                    Thread.sleep(1000);
    +                } catch (InterruptedException ie) {
    +                }
    +            }
    +        }
    +        if (consumer != null) consumer.close();
    +        return returnInstances;
    +    }
    +
    +    /**
    +     * Defines where to start reading data from
    +     * Helpers Available:
    +     * kafka.api.OffsetRequest.EarliestTime() => finds the beginning of the data in
the logs and starts streaming
    +     * from there
    +     * kafka.api.OffsetRequest.LatestTime()   => will only stream new messages
    +     *
    +     * @param consumer
    +     * @param topic
    +     * @param partition
    +     * @param whichTime
    +     * @param clientName
    +     * @return
    +     */
    +    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
    +        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    +        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,
PartitionOffsetRequestInfo>();
    +        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime,
1));
    +        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
    +                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    +        OffsetResponse response = consumer.getOffsetsBefore(request);
    +
    +        if (response.hasError()) {
    +            System.out.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
    +            return 0;
    +        }
    +        long[] offsets = response.offsets(topic, partition);
    +        return offsets[0];
    +    }
    +
    +    /**
    +     * Uses the findLeader() logic we defined to find the new leader, except here we
only try to connect to one of the
    +     * replicas for the topic/partition. This way if we can’t reach any of the Brokers
with the data we are interested
    +     * in we give up and exit hard.
    +     *
    +     * @param a_oldLeader
    +     * @param a_topic
    +     * @param a_partition
    +     * @param a_port
    +     * @return
    +     * @throws Exception
    +     */
    +    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition,
int a_port) throws Exception {
    +        for (int i = 0; i < 3; i++) {
    +            boolean goToSleep = false;
    +            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic,
a_partition);
    +            if (metadata == null) {
    +                goToSleep = true;
    +            } else if (metadata.leader() == null) {
    +                goToSleep = true;
    +            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) &&
i == 0) {
    +                // first time through if the leader hasn't changed give ZooKeeper a second
to recover
    +                // second time, assume the broker did recover before failover, or it
was a non-Broker issue
    +                //
    +                goToSleep = true;
    +            } else {
    +                return metadata.leader().host();
    +            }
    +            if (goToSleep) {
    +                try {
    +                    Thread.sleep(1000);
    +                } catch (InterruptedException ie) {
    +                }
    +            }
    +        }
    +        System.out.println("Unable to find new leader after Broker failure. Exiting");
    +        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    +    }
    +
    +    /**
    +     * Query a live broker to find out leader information and replica information for
a given topic and partition
    +     *
    +     * @param a_seedBrokers
    +     * @param a_port
    +     * @param a_topic
    +     * @param a_partition
    +     * @return
    +     */
    +    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,
String a_topic, int a_partition) {
    +        PartitionMetadata returnMetaData = null;
    +        for (String seed : a_seedBrokers) {
    +            SimpleConsumer consumer = null;
    +            try {
    +                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
//broker_host, broker_port, timeout, buffer_size, client_id
    --- End diff --
    
    Kafka supports 2 types of APIs for consumer
    1) SimpleConsumer API
    2) High Level Consumer API 
    I have implemented read operation using Simple Consumer API as it allows to control readoffset.
    High Level API are useful to implement parallel read from Kafka. 
    http://kafka.apache.org/documentation.html#highlevelconsumerapi


> Add Kafka stream reader modules to consume data from Kafka framework
> --------------------------------------------------------------------
>
>                 Key: SAMOA-40
>                 URL: https://issues.apache.org/jira/browse/SAMOA-40
>             Project: SAMOA
>          Issue Type: Task
>          Components: Infrastructure, SAMOA-API
>         Environment: OS X Version 10.10.3
>            Reporter: Vishal Karande
>            Priority: Minor
>              Labels: features
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Apache SAMOA is designed to process streaming data and develop streaming machine learning
> algorithm. Currently, SAMOA framework supports stream data read from Arff files only.
> Thus, while using SAMOA as a streaming machine learning component in real time use-cases,
> writing and reading data from files is slow and inefficient.
> A single Kafka broker can handle hundreds of megabytes of reads and writes per second

> from thousands of clients. The ability to read data directly from Apache Kafka into SAMOA
will 
> not only improve performance but also make SAMOA pluggable to many real time machine
> learning use cases such as Internet of Things(IoT).
> GOAL:
> Add code that enables SAMOA to read data from Apache Kafka as a stream data.
> Kafka stream reader supports following different options for streaming:
> a) Topic selection - Kafka topic to read data
> b) Partition selection - Kafka partition to read data
> c) Batching - Number of data instances read from Kafka in one read request to Kafka
> d) Configuration options - Kafka port number, seed information, time delay between two
read requests
> Components:
> KafkaReader - Consists for APIs to read data from Kafka
> KafkaStream - Stream source for SAMOA providing data read from Kafka
> Dependencies for Kafka are added in pom.xml for in samoa-api component. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message