spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Spark streaming issue
Date Fri, 01 Apr 2016 20:03:07 GMT
Hi,

I am just testing Spark streaming with Kafka.

Basically I am broadcasting topic every minute to Host:port
-> rhes564:2181. This is sending few lines through a shell script as
follows:

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes564:9092 --topic newtopic

That works fine and I can see the messages in

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--topic newtopic

Fri Apr 1 21:00:01 BST 2016  ======= Sending messages from rhes5
1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2
yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105

Now I try to see the topic in spark streaming as follows:

val conf = new SparkConf().
             setAppName("StreamTest").
             setMaster("local[12]").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//
// Create a local StreamingContext with two working thread and batch
interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val ssc = new StreamingContext(conf, Minutes(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
//val lines = ssc.socketTextStream("rhes564", 9092)
val lines = ssc.socketTextStream("rhes564", 2181)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to
the console
wordCounts.print()
ssc.start()

This is what I am getting:


scala> -------------------------------------------
Time: 1459541760000 ms
-------------------------------------------

But no values

Have I got the port wrong in this case or the set up is incorrect?


Thanks

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com

Mime
View raw message