[ https://issues.apache.org/jira/browse/SAMOA-40?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14634959#comment-14634959
]
ASF GitHub Bot commented on SAMOA-40:
-------------------------------------
Github user gdfm commented on a diff in the pull request:
https://github.com/apache/incubator-samoa/pull/32#discussion_r35090938
--- Diff: samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaReader.java ---
@@ -0,0 +1,279 @@
+package org.apache.samoa.streams.kafka;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class KafkaReader {
+
+ protected long readOffset;
+
+ private List<String> m_replicaBrokers = new ArrayList<String>();
+
+ public KafkaReader() {
+ m_replicaBrokers = new ArrayList<String>();
+ readOffset = 0L;
+ }
+
+ public ArrayList<String> run(long a_maxReads, String a_topic, int a_partition,
List<String> a_seedBrokers, int a_port) {
+
+ // find the meta data about the topic and partition we are interested in
+ String answer = "";
+ ArrayList<String> returnInstances = new ArrayList<String>();
+ PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
+ if (metadata == null) {
+ System.out.println("Can't find metadata for Topic and Partition. Exiting");
+ return null;
+ }
+ if (metadata.leader() == null) {
+ System.out.println("Can't find Leader for Topic and Partition. Exiting");
+ return null;
+ }
+ String leadBroker = metadata.leader().host();
+ String clientName = "Client_" + a_topic + "_" + a_partition;
+
+ SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 *
1024, clientName);
+ //long readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(),
clientName);
+ //readOffset = 0L;
+ int numErrors = 0;
+ while (a_maxReads > 0) {
+ if (consumer == null) {
+ consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,
clientName);
+ }
+ /**
+ * Reading data
+ */
+ FetchRequest req = new FetchRequestBuilder()
+ .clientId(clientName)
+ .addFetch(a_topic, a_partition, readOffset, 100000)
+ .build();
+ FetchResponse fetchResponse = null;
+
+ try {
+ fetchResponse = consumer.fetch(req);
+ } catch (Exception e) {
+
+ }
+
+ /**
+ * SimpleConsumer does not handle lead broker failures, you have to handle
it
+ * once the fetch returns an error, we log the reason, close the consumer
then try to figure
+ * out who the new leader is
+ */
+ if (fetchResponse.hasError()) {
+ numErrors++;
+ // Something went wrong!
+ short code = fetchResponse.errorCode(a_topic, a_partition);
+ System.out.println("Error fetching data from the Broker:" + leadBroker
+ " Reason: " + code);
+ if (numErrors > 5) break;
+ if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // We asked for an invalid offset. For simple case ask for the last
element to reset
+ //readOffset = getLastOffset(consumer,a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(),
clientName);
+ continue;
+ }
+ consumer.close();
+ consumer = null;
+ try {
+ leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ continue;
+ }
+ //End Error handling
+
+ // Reading data cont.
+ numErrors = 0;
+
+ long numRead = 0;
+ Iterator it = (Iterator) fetchResponse.messageSet(a_topic, a_partition).iterator();
+ MessageAndOffset messageAndOffset = null;
+
+ try {
+ messageAndOffset = (MessageAndOffset) it.next();
+ } catch (Exception e) {
+ return null;
+ }
+
+ //for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,
a_partition)) {
+ long currentOffset = messageAndOffset.offset();
+ if (currentOffset < readOffset) {
+ System.out.println("Found an old offset: " + currentOffset + " Expecting:
" + readOffset);
+ continue;
+ }
+ readOffset = messageAndOffset.nextOffset();
+ ByteBuffer payload = messageAndOffset.message().payload();
+
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ try {
+ System.out.println(String.valueOf(messageAndOffset.offset()) + ": " +
new String(bytes, "UTF-8"));
--- End diff --
Is this just for debug?
> Add Kafka stream reader modules to consume data from Kafka framework
> --------------------------------------------------------------------
>
> Key: SAMOA-40
> URL: https://issues.apache.org/jira/browse/SAMOA-40
> Project: SAMOA
> Issue Type: Task
> Components: Infrastructure, SAMOA-API
> Environment: OS X Version 10.10.3
> Reporter: Vishal Karande
> Priority: Minor
> Labels: features
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Apache SAMOA is designed to process streaming data and develop streaming machine learning
> algorithm. Currently, SAMOA framework supports stream data read from Arff files only.
> Thus, while using SAMOA as a streaming machine learning component in real time use-cases,
> writing and reading data from files is slow and inefficient.
> A single Kafka broker can handle hundreds of megabytes of reads and writes per second
> from thousands of clients. The ability to read data directly from Apache Kafka into SAMOA
will
> not only improve performance but also make SAMOA pluggable to many real time machine
> learning use cases such as Internet of Things(IoT).
> GOAL:
> Add code that enables SAMOA to read data from Apache Kafka as a stream data.
> Kafka stream reader supports following different options for streaming:
> a) Topic selection - Kafka topic to read data
> b) Partition selection - Kafka partition to read data
> c) Batching - Number of data instances read from Kafka in one read request to Kafka
> d) Configuration options - Kafka port number, seed information, time delay between two
read requests
> Components:
> KafkaReader - Consists for APIs to read data from Kafka
> KafkaStream - Stream source for SAMOA providing data read from Kafka
> Dependencies for Kafka are added in pom.xml for in samoa-api component.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
|