spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Spark streaming issue
Date Fri, 01 Apr 2016 22:26:25 GMT
I adopted this approach

scala> val conf = new SparkConf().
     |              setAppName("StreamTest").
     |              setMaster("local[12]").
     |              set("spark.driver.allowMultipleContexts", "true").
     |              set("spark.hadoop.validateOutputSpecs", "false")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
scala> val ssc = new StreamingContext(conf, Seconds(60))
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@5dbae9eb
scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(metadata.broker.list -> rhes564:9092)
scala> val topics = Set("newtopic")
topics: scala.collection.immutable.Set[String] = Set(newtopic)
scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing, Nothing)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21

So that opens data stream. What next?

Thanks



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 22:37, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

> yes I noticed that
>
> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
> <console>:52: error: overloaded method value createStream with
> alternatives:
>   (jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
> <and>
>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
> String,groupId: String,topics:
> scala.collection.immutable.Map[String,Int],storageLevel:
> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
> String)]
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> String, String, String, Int)
>          val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
> "rhes564:9092", "newtopic", 1)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:25, Cody Koeninger <cody@koeninger.org> wrote:
>
>> You're not passing valid Scala values.  rhes564:2181  without quotes
>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>
>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>> <mich.talebzadeh@gmail.com> wrote:
>> > Thanks Cody.
>> >
>> > Can I use Receiver-based Approach here?
>> >
>> > I have created the topic newtopic as below
>> >
>> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
>> > --replication-factor 1 --partitions 1 --topic newtopic
>> >
>> >
>> > This is basically what I am doing the Spark
>> >
>> > val lines = ssc.socketTextStream("rhes564", 2181)
>> >
>> > Which obviously not working
>> >
>> > This is what is suggested in the doc
>> >
>> > import org.apache.spark.streaming.kafka._
>> >
>> > val kafkaStream = KafkaUtils.createStream(streamingContext,
>> >      [ZK quorum], [consumer group id], [per-topic number of Kafka
>> partitions
>> > to consume])
>> >
>> > *   <zkQuorum> is a list of one or more zookeeper servers that make
>> quorum
>> > *   <group> is the name of kafka consumer group
>> > *   <topics> is a list of one or more kafka topics to consume from
>> > *   <numThreads> is the number of threads the kafka consumer should use
>> >
>> > Now this comes back with error. onviously not passing parameters
>> correctly!
>> >
>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> > <console>:1: error: identifier expected but integer literal found.
>> >        val kafkaStream = KafkaUtils.createStream(streamingContext,
>> > rhes564:2181, rhes564:9092, newtopic 1)
>> >
>> >
>> >
>> >
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 1 April 2016 at 21:13, Cody Koeninger <cody@koeninger.org> wrote:
>> >>
>> >> It looks like you're using a plain socket stream to connect to a
>> >> zookeeper port, which won't work.
>> >>
>> >>   Look at
>> spark.apache.org/docs/latest/streaming-kafka-integration.html
>> >>
>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>> >> <mich.talebzadeh@gmail.com> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am just testing Spark streaming with Kafka.
>> >> >
>> >> > Basically I am broadcasting topic every minute to Host:port ->
>> >> > rhes564:2181.
>> >> > This is sending few lines through a shell script as follows:
>> >> >
>> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
>> >> > --broker-list
>> >> > rhes564:9092 --topic newtopic
>> >> >
>> >> > That works fine and I can see the messages in
>> >> >
>> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>> >> > --topic
>> >> > newtopic
>> >> >
>> >> > Fri Apr 1 21:00:01 BST 2016  ======= Sending messages from rhes5
>> >> >
>> >> >
>> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
>> >> >
>> >> >
>> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
>> >> >
>> >> >
>> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
>> >> >
>> >> >
>> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
>> >> >
>> >> >
>> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>> >> >
>> >> > Now I try to see the topic in spark streaming as follows:
>> >> >
>> >> > val conf = new SparkConf().
>> >> >              setAppName("StreamTest").
>> >> >              setMaster("local[12]").
>> >> >              set("spark.driver.allowMultipleContexts", "true").
>> >> >              set("spark.hadoop.validateOutputSpecs", "false")
>> >> > val sc = new SparkContext(conf)
>> >> > // Create sqlContext based on HiveContext
>> >> > val sqlContext = new HiveContext(sc)
>> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> >> > //
>> >> > // Create a local StreamingContext with two working thread and batch
>> >> > interval of 1 second.
>> >> > // The master requires 2 cores to prevent from a starvation scenario.
>> >> > val ssc = new StreamingContext(conf, Minutes(1))
>> >> > // Create a DStream that will connect to hostname:port, like
>> >> > localhost:9999
>> >> > //val lines = ssc.socketTextStream("rhes564", 9092)
>> >> > val lines = ssc.socketTextStream("rhes564", 2181)
>> >> > // Split each line into words
>> >> > val words = lines.flatMap(_.split(" "))
>> >> > val pairs = words.map(word => (word, 1))
>> >> > val wordCounts = pairs.reduceByKey(_ + _)
>> >> > // Print the first ten elements of each RDD generated in this
>> DStream to
>> >> > the
>> >> > console
>> >> > wordCounts.print()
>> >> > ssc.start()
>> >> >
>> >> > This is what I am getting:
>> >> >
>> >> >
>> >> > scala> -------------------------------------------
>> >> > Time: 1459541760000 ms
>> >> > -------------------------------------------
>> >> >
>> >> > But no values
>> >> >
>> >> > Have I got the port wrong in this case or the set up is incorrect?
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> > Dr Mich Talebzadeh
>> >> >
>> >> >
>> >> >
>> >> > LinkedIn
>> >> >
>> >> >
>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >> >
>> >> >
>> >> >
>> >> > http://talebzadehmich.wordpress.com
>> >> >
>> >> >
>> >
>> >
>>
>
>

Mime
View raw message