spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Spark streaming issue
Date Fri, 01 Apr 2016 20:13:06 GMT
It looks like you're using a plain socket stream to connect to a
zookeeper port, which won't work.

  Look at spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
<mich.talebzadeh@gmail.com> wrote:
>
> Hi,
>
> I am just testing Spark streaming with Kafka.
>
> Basically I am broadcasting topic every minute to Host:port -> rhes564:2181.
> This is sending few lines through a shell script as follows:
>
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
> rhes564:9092 --topic newtopic
>
> That works fine and I can see the messages in
>
> ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181 --topic
> newtopic
>
> Fri Apr 1 21:00:01 BST 2016  ======= Sending messages from rhes5
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>
> Now I try to see the topic in spark streaming as follows:
>
> val conf = new SparkConf().
>              setAppName("StreamTest").
>              setMaster("local[12]").
>              set("spark.driver.allowMultipleContexts", "true").
>              set("spark.hadoop.validateOutputSpecs", "false")
> val sc = new SparkContext(conf)
> // Create sqlContext based on HiveContext
> val sqlContext = new HiveContext(sc)
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> //
> // Create a local StreamingContext with two working thread and batch
> interval of 1 second.
> // The master requires 2 cores to prevent from a starvation scenario.
> val ssc = new StreamingContext(conf, Minutes(1))
> // Create a DStream that will connect to hostname:port, like localhost:9999
> //val lines = ssc.socketTextStream("rhes564", 9092)
> val lines = ssc.socketTextStream("rhes564", 2181)
> // Split each line into words
> val words = lines.flatMap(_.split(" "))
> val pairs = words.map(word => (word, 1))
> val wordCounts = pairs.reduceByKey(_ + _)
> // Print the first ten elements of each RDD generated in this DStream to the
> console
> wordCounts.print()
> ssc.start()
>
> This is what I am getting:
>
>
> scala> -------------------------------------------
> Time: 1459541760000 ms
> -------------------------------------------
>
> But no values
>
> Have I got the port wrong in this case or the set up is incorrect?
>
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message