spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Spark streaming issue
Date Fri, 01 Apr 2016 23:34:30 GMT
Ok I managed to make this work.

All I am interested is receiving messages from topic every minute. No
filtering yet jut full text

import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
val sparkConf = new SparkConf().
             setAppName("StreamTest").
             setMaster("local[12]").
             set("spark.driver.allowMultipleContexts", "true").
             set("spark.hadoop.validateOutputSpecs", "false")
val ssc = new StreamingContext(sparkConf, Seconds(60))
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081",
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
messages.print()
ssc.start()


-------------------------------------------
Time: 1459554540000 ms
-------------------------------------------
(null,Sat Apr 2 00:33:01 BST 2016  ======= Sending messages from rhes5)
(null,1,'a7UkW5ZRaI_V8oRiPUNx0on6E06Ikr8_ILOxhVpgt6IoXXq2fF9ssYuJYcr49Cj4yp3nY9k8sHtIi_7XjltTVzqJ33beV2hIaqAj',101)
(null,2,'dnFxOkOibbKLR5m3CIeS3rhwn8hCiaZAfEaD7yXi6M7jXcvaFYBjClLDoNMEVgfLZVgJ9tXchqlGX44FmvhnarLFrtJNbTb1C6j4',102)
(null,3,'M9pvIOKMhaI_mSE3ExlovZWIxBE66KNEWGIGtCJF1qr_dGJX5sFKqLLa3Qv8aN2lCLi3lnGnMtqeZYBqE5YD586Vw50WWjL7ncZA',103)
(null,4,'9EROPf_dJZpdAHmBubTRxEUkvC9S_Xnll5bWmX0xcOPk7l4TGXPgEqxpUP52QG6pUIn74mvwWqF9vzZ2ZhsmV6WPOmUAw4Ub_nFU',104)
(null,5,'BLIi9a_n7Pfyc7r3nfzKfaNRa4Hmd9NlHEVDPkQS4xbgUWqU2bJeI6b8b1IMoStnmjMHhYLtFf4TQyJcpn85PSwFksggNVnQl1oL',105)



Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 1 April 2016 at 23:26, Mich Talebzadeh <mich.talebzadeh@gmail.com> wrote:

> I adopted this approach
>
> scala> val conf = new SparkConf().
>      |              setAppName("StreamTest").
>      |              setMaster("local[12]").
>      |              set("spark.driver.allowMultipleContexts", "true").
>      |              set("spark.hadoop.validateOutputSpecs", "false")
> conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@321d96f7
> scala> val ssc = new StreamingContext(conf, Seconds(60))
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@5dbae9eb
> scala> val kafkaParams = Map("metadata.broker.list" -> "rhes564:9092")
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(metadata.broker.list -> rhes564:9092)
> scala> val topics = Set("newtopic")
> topics: scala.collection.immutable.Set[String] = Set(newtopic)
> scala> val stream = KafkaUtils.createDirectStream(ssc, kafkaParams, topics)
> stream: org.apache.spark.streaming.dstream.InputDStream[(Nothing,
> Nothing)] =
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@6d2d3b21
>
> So that opens data stream. What next?
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 22:37, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> yes I noticed that
>>
>> scala> val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>> <console>:52: error: overloaded method value createStream with
>> alternatives:
>>   (jssc:
>> org.apache.spark.streaming.api.java.JavaStreamingContext,zkQuorum:
>> String,groupId: String,topics: java.util.Map[String,Integer],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,String]
>> <and>
>>   (ssc: org.apache.spark.streaming.StreamingContext,zkQuorum:
>> String,groupId: String,topics:
>> scala.collection.immutable.Map[String,Int],storageLevel:
>> org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.dstream.ReceiverInputDStream[(String,
>> String)]
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> String, String, String, Int)
>>          val kafkaStream = KafkaUtils.createStream(ssc, "rhes564:2181",
>> "rhes564:9092", "newtopic", 1)
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 22:25, Cody Koeninger <cody@koeninger.org> wrote:
>>
>>> You're not passing valid Scala values.  rhes564:2181  without quotes
>>> isn't a valid literal, newtopic isn't a list of strings, etc.
>>>
>>> On Fri, Apr 1, 2016 at 4:04 PM, Mich Talebzadeh
>>> <mich.talebzadeh@gmail.com> wrote:
>>> > Thanks Cody.
>>> >
>>> > Can I use Receiver-based Approach here?
>>> >
>>> > I have created the topic newtopic as below
>>> >
>>> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes564:2181
>>> > --replication-factor 1 --partitions 1 --topic newtopic
>>> >
>>> >
>>> > This is basically what I am doing the Spark
>>> >
>>> > val lines = ssc.socketTextStream("rhes564", 2181)
>>> >
>>> > Which obviously not working
>>> >
>>> > This is what is suggested in the doc
>>> >
>>> > import org.apache.spark.streaming.kafka._
>>> >
>>> > val kafkaStream = KafkaUtils.createStream(streamingContext,
>>> >      [ZK quorum], [consumer group id], [per-topic number of Kafka
>>> partitions
>>> > to consume])
>>> >
>>> > *   <zkQuorum> is a list of one or more zookeeper servers that make
>>> quorum
>>> > *   <group> is the name of kafka consumer group
>>> > *   <topics> is a list of one or more kafka topics to consume from
>>> > *   <numThreads> is the number of threads the kafka consumer should
use
>>> >
>>> > Now this comes back with error. onviously not passing parameters
>>> correctly!
>>> >
>>> > scala> val kafkaStream = KafkaUtils.createStream(streamingContext,
>>> > rhes564:2181, rhes564:9092, newtopic 1)
>>> > <console>:1: error: identifier expected but integer literal found.
>>> >        val kafkaStream = KafkaUtils.createStream(streamingContext,
>>> > rhes564:2181, rhes564:9092, newtopic 1)
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > Dr Mich Talebzadeh
>>> >
>>> >
>>> >
>>> > LinkedIn
>>> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> >
>>> >
>>> >
>>> > http://talebzadehmich.wordpress.com
>>> >
>>> >
>>> >
>>> >
>>> > On 1 April 2016 at 21:13, Cody Koeninger <cody@koeninger.org> wrote:
>>> >>
>>> >> It looks like you're using a plain socket stream to connect to a
>>> >> zookeeper port, which won't work.
>>> >>
>>> >>   Look at
>>> spark.apache.org/docs/latest/streaming-kafka-integration.html
>>> >>
>>> >> On Fri, Apr 1, 2016 at 3:03 PM, Mich Talebzadeh
>>> >> <mich.talebzadeh@gmail.com> wrote:
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > I am just testing Spark streaming with Kafka.
>>> >> >
>>> >> > Basically I am broadcasting topic every minute to Host:port ->
>>> >> > rhes564:2181.
>>> >> > This is sending few lines through a shell script as follows:
>>> >> >
>>> >> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
>>> >> > --broker-list
>>> >> > rhes564:9092 --topic newtopic
>>> >> >
>>> >> > That works fine and I can see the messages in
>>> >> >
>>> >> > ${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
>>> >> > --topic
>>> >> > newtopic
>>> >> >
>>> >> > Fri Apr 1 21:00:01 BST 2016  ======= Sending messages from rhes5
>>> >> >
>>> >> >
>>> 1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
>>> >> >
>>> >> >
>>> 2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
>>> >> >
>>> >> >
>>> 3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
>>> >> >
>>> >> >
>>> 4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
>>> >> >
>>> >> >
>>> 5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105
>>> >> >
>>> >> > Now I try to see the topic in spark streaming as follows:
>>> >> >
>>> >> > val conf = new SparkConf().
>>> >> >              setAppName("StreamTest").
>>> >> >              setMaster("local[12]").
>>> >> >              set("spark.driver.allowMultipleContexts", "true").
>>> >> >              set("spark.hadoop.validateOutputSpecs", "false")
>>> >> > val sc = new SparkContext(conf)
>>> >> > // Create sqlContext based on HiveContext
>>> >> > val sqlContext = new HiveContext(sc)
>>> >> > val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> >> > //
>>> >> > // Create a local StreamingContext with two working thread and
batch
>>> >> > interval of 1 second.
>>> >> > // The master requires 2 cores to prevent from a starvation
>>> scenario.
>>> >> > val ssc = new StreamingContext(conf, Minutes(1))
>>> >> > // Create a DStream that will connect to hostname:port, like
>>> >> > localhost:9999
>>> >> > //val lines = ssc.socketTextStream("rhes564", 9092)
>>> >> > val lines = ssc.socketTextStream("rhes564", 2181)
>>> >> > // Split each line into words
>>> >> > val words = lines.flatMap(_.split(" "))
>>> >> > val pairs = words.map(word => (word, 1))
>>> >> > val wordCounts = pairs.reduceByKey(_ + _)
>>> >> > // Print the first ten elements of each RDD generated in this
>>> DStream to
>>> >> > the
>>> >> > console
>>> >> > wordCounts.print()
>>> >> > ssc.start()
>>> >> >
>>> >> > This is what I am getting:
>>> >> >
>>> >> >
>>> >> > scala> -------------------------------------------
>>> >> > Time: 1459541760000 ms
>>> >> > -------------------------------------------
>>> >> >
>>> >> > But no values
>>> >> >
>>> >> > Have I got the port wrong in this case or the set up is incorrect?
>>> >> >
>>> >> >
>>> >> > Thanks
>>> >> >
>>> >> > Dr Mich Talebzadeh
>>> >> >
>>> >> >
>>> >> >
>>> >> > LinkedIn
>>> >> >
>>> >> >
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> >> >
>>> >> >
>>> >> >
>>> >> > http://talebzadehmich.wordpress.com
>>> >> >
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Mime
View raw message