kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: JavaKafkaWordCount not working under Spark Streaming
Date Tue, 11 Nov 2014 07:10:31 GMT
Here's a simple working version.


import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by akhld on 11/11/14.
 */

public class KafkaWordcount {

    public static void main(String[] args) {

        // Location of the Spark directory
        String sparkHome = "/home/akhld/mobi/localcluster/spark-1";

        // URL of the Spark cluster
        String sparkUrl = "spark://akhldz:7077";

        // Location of the required JAR files
        String jarFiles =
"/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar";

        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("JavaKafkaWordCount");
        sparkConf.setJars(new String[]{jarFiles});
        sparkConf.setMaster(sparkUrl);
        sparkConf.setSparkHome(sparkHome);

        //These are the minimal things that are required

        *Map<String, Integer> topicMap = new HashMap<String, Integer>();*
*        topicMap.put("test", 1);*
*        String kafkaGroup = "groups";*
*        String zkQuorum = "localhost:2181";*

        // Create the context with a 1 second batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(2000));

        JavaPairDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum,
                kafkaGroup, topicMap);


        JavaDStream<String> lines = messages.map(new
Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });

        JavaDStream<String> words = lines.flatMap(new
FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String x) {
                return Lists.newArrayList(x.split(" "));
            }
        });

        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }
        });

        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();


    }

}


[image: Inline image 1]

Thanks
Best Regards

On Tue, Nov 11, 2014 at 5:37 AM, Something Something <
mailinglists19@gmail.com> wrote:

> I am not running locally.  The Spark master is:
>
> "spark://<machine name>:7077"
>
>
>
> On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> What is the Spark master that you are using. Use local[4], not local
>> if you are running locally.
>>
>> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>> <mailinglists19@gmail.com> wrote:
>> > I am embarrassed to admit but I can't get a basic 'word count' to work
>> under
>> > Kafka/Spark streaming.  My code looks like this.  I  don't see any word
>> > counts in console output.  Also, don't see any output in UI.  Needless
>> to
>> > say, I am newbie in both 'Spark' as well as 'Kafka'.
>> >
>> > Please help.  Thanks.
>> >
>> > Here's the code:
>> >
>> >     public static void main(String[] args) {
>> >         if (args.length < 4) {
>> >             System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
>> <group>
>> > <topics> <numThreads>");
>> >             System.exit(1);
>> >         }
>> >
>> > //        StreamingExamples.setStreamingLogLevels();
>> > //        SparkConf sparkConf = new
>> > SparkConf().setAppName("JavaKafkaWordCount");
>> >
>> >         // Location of the Spark directory
>> >         String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>> >
>> >         // URL of the Spark cluster
>> >         String sparkUrl = "spark://mymachine:7077";
>> >
>> >         // Location of the required JAR files
>> >         String jarFiles =
>> >
>> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>> >
>> >         SparkConf sparkConf = new SparkConf();
>> >         sparkConf.setAppName("JavaKafkaWordCount");
>> >         sparkConf.setJars(new String[]{jarFiles});
>> >         sparkConf.setMaster(sparkUrl);
>> >         sparkConf.set("spark.ui.port", "2348");
>> >         sparkConf.setSparkHome(sparkHome);
>> >
>> >         Map<String, String> kafkaParams = new HashMap<String, String>();
>> >         kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>> >         kafkaParams.put("group.id", "1");
>> >         kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>> >         kafkaParams.put("serializer.class",
>> > "kafka.serializer.StringEncoder");
>> >         kafkaParams.put("request.required.acks", "1");
>> >
>> >         // Create the context with a 1 second batch size
>> >         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> new
>> > Duration(2000));
>> >
>> >         int numThreads = Integer.parseInt(args[3]);
>> >         Map<String, Integer> topicMap = new HashMap<String, Integer>();
>> >         String[] topics = args[2].split(",");
>> >         for (String topic: topics) {
>> >             topicMap.put(topic, numThreads);
>> >         }
>> >
>> > //        JavaPairReceiverInputDStream<String, String> messages =
>> > //                KafkaUtils.createStream(jssc, args[0], args[1],
>> topicMap);
>> >         JavaPairDStream<String, String> messages =
>> > KafkaUtils.createStream(jssc,
>> >                 String.class,
>> >                 String.class,
>> >                 StringDecoder.class,
>> >                 StringDecoder.class,
>> >                 kafkaParams,
>> >                 topicMap,
>> >                 StorageLevel.MEMORY_ONLY_SER());
>> >
>> >
>> >         JavaDStream<String> lines = messages.map(new
>> Function<Tuple2<String,
>> > String>, String>() {
>> >             @Override
>> >             public String call(Tuple2<String, String> tuple2) {
>> >                 return tuple2._2();
>> >             }
>> >         });
>> >
>> >         JavaDStream<String> words = lines.flatMap(new
>> > FlatMapFunction<String, String>() {
>> >             @Override
>> >             public Iterable<String> call(String x) {
>> >                 return Lists.newArrayList(SPACE.split(x));
>> >             }
>> >         });
>> >
>> >         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>> >                 new PairFunction<String, String, Integer>() {
>> >                     @Override
>> >                     public Tuple2<String, Integer> call(String s) {
>> >                         return new Tuple2<String, Integer>(s, 1);
>> >                     }
>> >                 }).reduceByKey(new Function2<Integer, Integer,
>> Integer>() {
>> >             @Override
>> >             public Integer call(Integer i1, Integer i2) {
>> >                 return i1 + i2;
>> >             }
>> >         });
>> >
>> >         wordCounts.print();
>> >         jssc.start();
>> >         jssc.awaitTermination();
>> >
>>
>
>

Mime
  • Unnamed multipart/related (inline, None, 0 bytes)
View raw message