spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Re: JavaKafkaWordCount not working under Spark Streaming
Date Tue, 11 Nov 2014 00:07:32 GMT
I am not running locally.  The Spark master is:

"spark://<machine name>:7077"



On Mon, Nov 10, 2014 at 3:47 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> What is the Spark master that you are using. Use local[4], not local
> if you are running locally.
>
> On Mon, Nov 10, 2014 at 3:01 PM, Something Something
> <mailinglists19@gmail.com> wrote:
> > I am embarrassed to admit but I can't get a basic 'word count' to work
> under
> > Kafka/Spark streaming.  My code looks like this.  I  don't see any word
> > counts in console output.  Also, don't see any output in UI.  Needless to
> > say, I am newbie in both 'Spark' as well as 'Kafka'.
> >
> > Please help.  Thanks.
> >
> > Here's the code:
> >
> >     public static void main(String[] args) {
> >         if (args.length < 4) {
> >             System.err.println("Usage: JavaKafkaWordCount <zkQuorum>
> <group>
> > <topics> <numThreads>");
> >             System.exit(1);
> >         }
> >
> > //        StreamingExamples.setStreamingLogLevels();
> > //        SparkConf sparkConf = new
> > SparkConf().setAppName("JavaKafkaWordCount");
> >
> >         // Location of the Spark directory
> >         String sparkHome = "/opt/mapr/spark/spark-1.0.2/";
> >
> >         // URL of the Spark cluster
> >         String sparkUrl = "spark://mymachine:7077";
> >
> >         // Location of the required JAR files
> >         String jarFiles =
> >
> "./spark-streaming-kafka_2.10-1.1.0.jar,./DlSpark-1.0-SNAPSHOT.jar,./zkclient-0.3.jar,./kafka_2.10-0.8.1.1.jar,./metrics-core-2.2.0.jar";
> >
> >         SparkConf sparkConf = new SparkConf();
> >         sparkConf.setAppName("JavaKafkaWordCount");
> >         sparkConf.setJars(new String[]{jarFiles});
> >         sparkConf.setMaster(sparkUrl);
> >         sparkConf.set("spark.ui.port", "2348");
> >         sparkConf.setSparkHome(sparkHome);
> >
> >         Map<String, String> kafkaParams = new HashMap<String, String>();
> >         kafkaParams.put("zookeeper.connect", "myedgenode:2181");
> >         kafkaParams.put("group.id", "1");
> >         kafkaParams.put("metadata.broker.list", "myedgenode:9092");
> >         kafkaParams.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> >         kafkaParams.put("request.required.acks", "1");
> >
> >         // Create the context with a 1 second batch size
> >         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> new
> > Duration(2000));
> >
> >         int numThreads = Integer.parseInt(args[3]);
> >         Map<String, Integer> topicMap = new HashMap<String, Integer>();
> >         String[] topics = args[2].split(",");
> >         for (String topic: topics) {
> >             topicMap.put(topic, numThreads);
> >         }
> >
> > //        JavaPairReceiverInputDStream<String, String> messages =
> > //                KafkaUtils.createStream(jssc, args[0], args[1],
> topicMap);
> >         JavaPairDStream<String, String> messages =
> > KafkaUtils.createStream(jssc,
> >                 String.class,
> >                 String.class,
> >                 StringDecoder.class,
> >                 StringDecoder.class,
> >                 kafkaParams,
> >                 topicMap,
> >                 StorageLevel.MEMORY_ONLY_SER());
> >
> >
> >         JavaDStream<String> lines = messages.map(new
> Function<Tuple2<String,
> > String>, String>() {
> >             @Override
> >             public String call(Tuple2<String, String> tuple2) {
> >                 return tuple2._2();
> >             }
> >         });
> >
> >         JavaDStream<String> words = lines.flatMap(new
> > FlatMapFunction<String, String>() {
> >             @Override
> >             public Iterable<String> call(String x) {
> >                 return Lists.newArrayList(SPACE.split(x));
> >             }
> >         });
> >
> >         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> >                 new PairFunction<String, String, Integer>() {
> >                     @Override
> >                     public Tuple2<String, Integer> call(String s) {
> >                         return new Tuple2<String, Integer>(s, 1);
> >                     }
> >                 }).reduceByKey(new Function2<Integer, Integer,
> Integer>() {
> >             @Override
> >             public Integer call(Integer i1, Integer i2) {
> >                 return i1 + i2;
> >             }
> >         });
> >
> >         wordCounts.print();
> >         jssc.start();
> >         jssc.awaitTermination();
> >
>

Mime
View raw message