spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chia-Chun Shih <chiachun.s...@gmail.com>
Subject Re: JavaKafkaWordCount not working under Spark Streaming
Date Tue, 11 Nov 2014 08:39:22 GMT
You can run this command in kafka directory

    bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect
myedgenode:2181 --group 1

    (in which 1 is your consumer group id)

And check the two columns:

   - Offset: should > 0 if your program (or you) have successfully produce
   data, and should increase if your program produces more data
   - Lag: should == 0 if Spark has consumed all data in Kafka

This command can help you identify whether the problem occurs in producer
side or in consumer side.

regards,
Chia-Chun

2014-11-11 15:10 GMT+08:00 Akhil Das <akhil@sigmoidanalytics.com>:

> Here's a simple working version.
>
>
> import com.google.common.collect.Lists;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.function.FlatMapFunction;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.api.java.function.Function2;
> import org.apache.spark.api.java.function.PairFunction;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import scala.Tuple2;
>
> import java.util.HashMap;
> import java.util.Map;
>
> /**
>  * Created by akhld on 11/11/14.
>  */
>
> public class KafkaWordcount {
>
>     public static void main(String[] args) {
>
>         // Location of the Spark directory
>         String sparkHome = "/home/akhld/mobi/localcluster/spark-1";
>
>         // URL of the Spark cluster
>         String sparkUrl = "spark://akhldz:7077";
>
>         // Location of the required JAR files
>         String jarFiles =
> "/home/akhld/mobi/temp/kafkwc.jar,/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.1.0.jar,/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar,/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.0.jar,/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar";
>
>         SparkConf sparkConf = new SparkConf();
>         sparkConf.setAppName("JavaKafkaWordCount");
>         sparkConf.setJars(new String[]{jarFiles});
>         sparkConf.setMaster(sparkUrl);
>         sparkConf.setSparkHome(sparkHome);
>
>         //These are the minimal things that are required
>
>         *Map<String, Integer> topicMap = new HashMap<String, Integer>();*
> *        topicMap.put("test", 1);*
> *        String kafkaGroup = "groups";*
> *        String zkQuorum = "localhost:2181";*
>
>         // Create the context with a 1 second batch size
>         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new Duration(2000));
>
>         JavaPairDStream<String, String> messages =
> KafkaUtils.createStream(jssc, zkQuorum,
>                 kafkaGroup, topicMap);
>
>
>         JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String, String>, String>() {
>             @Override
>             public String call(Tuple2<String, String> tuple2) {
>                 return tuple2._2();
>             }
>         });
>
>         JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
>             @Override
>             public Iterable<String> call(String x) {
>                 return Lists.newArrayList(x.split(" "));
>             }
>         });
>
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>                 new PairFunction<String, String, Integer>() {
>                     @Override
>                     public Tuple2<String, Integer> call(String s) {
>                         return new Tuple2<String, Integer>(s, 1);
>                     }
>                 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>             @Override
>             public Integer call(Integer i1, Integer i2) {
>                 return i1 + i2;
>             }
>         });
>
>         wordCounts.print();
>         jssc.start();
>         jssc.awaitTermination();
>
>
>     }
>
> }
>
>
> [image: Inline image 1]
>
> Thanks
> Best Regards
>
> On Tue, Nov 11, 2014 at 5:37 AM, Something Something <
> mailinglists19@gmail.com> wrote:
>
>> I am not running locally.  The Spark master is:
>>
>> "spark://<machine name>:7077"
>>
>>
>>
>> On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <
>> tathagata.das1565@gmail.com> wrote:
>>
>>> What is the Spark master that you are using. Use local[4], not local
>>> if you are running locally.
>>>
>>> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
>>> <mailinglists19@gmail.com> wrote:
>>> > I am embarrassed to admit but I can't get a basic 'word count' to work
>>> under
>>> > Kafka/Spark streaming.  My code looks like this.  I  don't see any word
>>> > counts in console output.  Also, don't see any output in UI.  Needless
>>> to
>>> > say, I am newbie in both 'Spark' as well as 'Kafka'.
>>> >
>>> > Please help.  Thanks.
>>> >
>>> > Here's the code:
>>> >
>>> >     public static void main(String[] args) {
>>> >         if (args.length < 4) {
>>> >             System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
>>> <group>
>>> > <topics> <numThreads>");
>>> >             System.exit(1);
>>> >         }
>>> >
>>> > //        StreamingExamples.setStreamingLogLevels();
>>> > //        SparkConf sparkConf = new
>>> > SparkConf().setAppName("JavaKafkaWordCount");
>>> >
>>> >         // Location of the Spark directory
>>> >         String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
>>> >
>>> >         // URL of the Spark cluster
>>> >         String sparkUrl = "spark://mymachine:7077";
>>> >
>>> >         // Location of the required JAR files
>>> >         String jarFiles =
>>> >
>>> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
>>> >
>>> >         SparkConf sparkConf = new SparkConf();
>>> >         sparkConf.setAppName("JavaKafkaWordCount");
>>> >         sparkConf.setJars(new String[]{jarFiles});
>>> >         sparkConf.setMaster(sparkUrl);
>>> >         sparkConf.set("spark.ui.port", "2348");
>>> >         sparkConf.setSparkHome(sparkHome);
>>> >
>>> >         Map<String, String> kafkaParams = new HashMap<String,
>>> String>();
>>> >         kafkaParams.put("zookeeper.connect", "myedgenode:2181");
>>> >         kafkaParams.put("group.id", "1");
>>> >         kafkaParams.put("metadata.broker.list", "myedgenode:9092");
>>> >         kafkaParams.put("serializer.class",
>>> > "kafka.serializer.StringEncoder");
>>> >         kafkaParams.put("request.required.acks", "1");
>>> >
>>> >         // Create the context with a 1 second batch size
>>> >         JavaStreamingContext jssc = new
>>> JavaStreamingContext(sparkConf, new
>>> > Duration(2000));
>>> >
>>> >         int numThreads = Integer.parseInt(args[3]);
>>> >         Map<String, Integer> topicMap = new HashMap<String, Integer>();
>>> >         String[] topics = args[2].split(",");
>>> >         for (String topic: topics) {
>>> >             topicMap.put(topic, numThreads);
>>> >         }
>>> >
>>> > //        JavaPairReceiverInputDStream<String, String> messages =
>>> > //                KafkaUtils.createStream(jssc, args[0], args[1],
>>> topicMap);
>>> >         JavaPairDStream<String, String> messages =
>>> > KafkaUtils.createStream(jssc,
>>> >                 String.class,
>>> >                 String.class,
>>> >                 StringDecoder.class,
>>> >                 StringDecoder.class,
>>> >                 kafkaParams,
>>> >                 topicMap,
>>> >                 StorageLevel.MEMORY_ONLY_SER());
>>> >
>>> >
>>> >         JavaDStream<String> lines = messages.map(new
>>> Function<Tuple2<String,
>>> > String>, String>() {
>>> >             @Override
>>> >             public String call(Tuple2<String, String> tuple2) {
>>> >                 return tuple2._2();
>>> >             }
>>> >         });
>>> >
>>> >         JavaDStream<String> words = lines.flatMap(new
>>> > FlatMapFunction<String, String>() {
>>> >             @Override
>>> >             public Iterable<String> call(String x) {
>>> >                 return Lists.newArrayList(SPACE.split(x));
>>> >             }
>>> >         });
>>> >
>>> >         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>>> >                 new PairFunction<String, String, Integer>() {
>>> >                     @Override
>>> >                     public Tuple2<String, Integer> call(String s)
{
>>> >                         return new Tuple2<String, Integer>(s, 1);
>>> >                     }
>>> >                 }).reduceByKey(new Function2<Integer, Integer,
>>> Integer>() {
>>> >             @Override
>>> >             public Integer call(Integer i1, Integer i2) {
>>> >                 return i1 + i2;
>>> >             }
>>> >         });
>>> >
>>> >         wordCounts.print();
>>> >         jssc.start();
>>> >         jssc.awaitTermination();
>>> >
>>>
>>
>>
>

Mime
View raw message