spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Spark streaming issue
Date Fri, 01 Apr 2016 21:04:09 GMT
Thanks Cody.

Can I use Receiver-based Approach here?

I have created the topic newtopic as below

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
--replication-factor 1 --partitions 1 --topic newtopic

This is basically what I am doing the Spark

val lines = ssc.socketTextStream("rhes564", 2181)

Which obviously not working

This is what is suggested in the doc

import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka
partitions to consume])

*   <zkQuorum> is a list of one or more zookeeper servers that make quorum
*   <group> is the name of kafka consumer group
*   <topics> is a list of one or more kafka topics to consume from
*   <numThreads> is the number of threads the kafka consumer should use

Now this comes back with error. onviously not passing parameters correctly!

scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)
<console>:1: error: identifier expected but integer literal found.
       val kafkaStream = KafkaUtils.createStream(streamingContext,
rhes564:2181, rhes564:9092, newtopic 1)






Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 21:13, Cody Koeninger <cody@koeninger.org> wrote:

> It looks like you're using a plain socket stream to connect to a
> zookeeper port, which won't work.
>
>   Look at spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
> <mich.talebzadeh@gmail.com> wrote:
> >
> > Hi,
> >
> > I am just testing Spark streaming with Kafka.
> >
> > Basically I am broadcasting topic every minute to Host:port ->
> rhes564:2181.
> > This is sending few lines through a shell script as follows:
> >
> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> --broker-list
> > rhes564:9092 --topic newtopic
> >
> > That works fine and I can see the messages in
> >
> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
> --topic
> > newtopic
> >
> > Fri Apr 1 21:00:01 BST 2016  ======= Sending messages from rhes5
> >
> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
> >
> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
> >
> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
> >
> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
> >
> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
> >
> > Now I try to see the topic in spark streaming as follows:
> >
> > val conf = new SparkConf().
> >              setAppName("StreamTest").
> >              setMaster("local[12]").
> >              set("spark.driver.allowMultipleContexts", "true").
> >              set("spark.hadoop.validateOutputSpecs", "false")
> > val sc = new SparkContext(conf)
> > // Create sqlContext based on HiveContext
> > val sqlContext = new HiveContext(sc)
> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> > //
> > // Create a local StreamingContext with two working thread and batch
> > interval of 1 second.
> > // The master requires 2 cores to prevent from a starvation scenario.
> > val ssc = new StreamingContext(conf, Minutes(1))
> > // Create a DStream that will connect to hostname:port, like
> localhost:9999
> > //val lines = ssc.socketTextStream("rhes564", 9092)
> > val lines = ssc.socketTextStream("rhes564", 2181)
> > // Split each line into words
> > val words = lines.flatMap(_.split(" "))
> > val pairs = words.map(word => (word, 1))
> > val wordCounts = pairs.reduceByKey(_ + _)
> > // Print the first ten elements of each RDD generated in this DStream to
> the
> > console
> > wordCounts.print()
> > ssc.start()
> >
> > This is what I am getting:
> >
> >
> > scala> -------------------------------------------
> > Time: 1459541760000 ms
> > -------------------------------------------
> >
> > But no values
> >
> > Have I got the port wrong in this case or the set up is incorrect?
> >
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>

Mime
View raw message