spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "hsy541@gmail.com" <hsy...@gmail.com>
Subject Re: How to do an interactive Spark SQL
Date Wed, 23 Jul 2014 02:02:09 GMT
But how do they do the interactive sql in the demo?
https://www.youtube.com/watch?v=dJQ5lV5Tldw

And if it can work in the local mode. I think it should be able to work in
cluster mode, correct?


On Tue, Jul 22, 2014 at 5:58 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:

> Hi,
>
> as far as I know, after the Streaming Context has started, the processing
> pipeline (e.g., filter.map.join.filter) cannot be changed. As your SQL
> statement is transformed into RDD operations when the Streaming Context
> starts, I think there is no way to change the statement that is executed on
> the current stream after the StreamingContext has started.
>
> Tobias
>
>
> On Wed, Jul 23, 2014 at 9:55 AM, hsy541@gmail.com <hsy541@gmail.com>
> wrote:
>
>> For example, this is what I tested and work on local mode, what it does
>> is it get data and sql query both from kafka and do sql on each RDD and
>> output the result back to kafka again
>> I defined a var called *sqlS. * In the streaming part as you can see I
>> change the sql statement if it consumes a sql message from kafka then next
>> time when you do *sql(sqlS) *it execute the updated sql query.
>>
>> But this code doesn't work in cluster because sqlS is not updated on all
>> the workers from what I understand.
>>
>> So my question is how do I change the sqlS value at runtime and make all
>> the workers pick the latest value.
>>
>>
>>     *var sqlS = "select count(*) from records"*
>>     val Array(zkQuorum, group, topic, sqltopic, outputTopic, numParts) =
>> args
>>     val sparkConf = new SparkConf().setAppName("KafkaSpark")
>>     val sc = new SparkContext(sparkConf)
>>     val ssc = new StreamingContext(sc, Seconds(2))
>>     val sqlContext = new SQLContext(sc)
>>
>>     // Importing the SQL context gives access to all the SQL functions
>> and implicit conversions.
>>     import sqlContext._
>>     import sqlContext.createSchemaRDD
>>
>>     //    val tt = Time(5000)
>>     val topicpMap = collection.immutable.HashMap(topic -> numParts.toInt,
>> sqltopic -> 2)
>>     val recordsStream = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicpMap).window(Seconds(4)).filter(t => { if (t._1 == "sql") { *sqlS =
>> t._2;* false } else true }).map(t => getRecord(t._2.split("#")))
>>
>>     val zkClient = new ZkClient(zkQuorum, 30000, 30000,
>> ZKStringSerializer)
>>
>>     val brokerString =
>> ZkUtils.getAllBrokersInCluster(zkClient).map(_.getConnectionString).mkString(",")
>>
>>     KafkaSpark.props.put("metadata.broker.list", brokerString)
>>     val config = new ProducerConfig(KafkaSpark.props)
>>     val producer = new Producer[String, String](config)
>>
>>     val result = recordsStream.foreachRDD((recRDD) => {
>>       val schemaRDD = sqlContext.createSchemaRDD(recRDD)
>>       schemaRDD.registerAsTable(tName)
>>       val result = *sql(sqlS)*.collect.foldLeft("Result:\n")((s, r) => {
>> s + r.mkString(",") + "\n" })
>>       producer.send(new KeyedMessage[String, String](outputTopic, s"SQL:
>> $sqlS \n $result"))
>>     })
>>     ssc.start()
>>     ssc.awaitTermination()
>>
>>
>> On Tue, Jul 22, 2014 at 5:10 PM, Zongheng Yang <zongheng.y@gmail.com>
>> wrote:
>>
>>> Can you paste a small code example to illustrate your questions?
>>>
>>> On Tue, Jul 22, 2014 at 5:05 PM, hsy541@gmail.com <hsy541@gmail.com>
>>> wrote:
>>> > Sorry, typo. What I mean is sharing. If the sql is changing at
>>> runtime, how
>>> > do I broadcast the sql to all workers that is doing sql analysis.
>>> >
>>> > Best,
>>> > Siyuan
>>> >
>>> >
>>> > On Tue, Jul 22, 2014 at 4:15 PM, Zongheng Yang <zongheng.y@gmail.com>
>>> wrote:
>>> >>
>>> >> Do you mean that the texts of the SQL queries being hardcoded in the
>>> >> code? What do you mean by "cannot shar the sql to all workers"?
>>> >>
>>> >> On Tue, Jul 22, 2014 at 4:03 PM, hsy541@gmail.com <hsy541@gmail.com>
>>> >> wrote:
>>> >> > Hi guys,
>>> >> >
>>> >> > I'm able to run some Spark SQL example but the sql is static in
the
>>> >> > code. I
>>> >> > would like to know is there a way to read sql from somewhere else
>>> (shell
>>> >> > for
>>> >> > example)
>>> >> >
>>> >> > I could read sql statement from kafka/zookeeper, but I cannot share
>>> the
>>> >> > sql
>>> >> > to all workers. broadcast seems not working for updating values.
>>> >> >
>>> >> > Moreover if I use some non-serializable class(DataInputStream etc)
>>> to
>>> >> > read
>>> >> > sql from other source, I always get "Task not serializable:
>>> >> > java.io.NotSerializableException"
>>> >> >
>>> >> >
>>> >> > Best,
>>> >> > Siyuan
>>> >
>>> >
>>>
>>
>>
>

Mime
View raw message