spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ajay Chander <itsche...@gmail.com>
Subject Re: HiveContext is Serialized?
Date Wed, 26 Oct 2016 04:34:17 GMT
Sunita, Thanks for your time. In my scenario, based on each attribute from
deDF(1 column with just 66 rows), I have to query a Hive table and insert
into another table.

Thanks,
Ajay

On Wed, Oct 26, 2016 at 12:21 AM, Sunita Arvind <sunitarvind@gmail.com>
wrote:

> Ajay,
>
> Afaik Generally these contexts cannot be accessed within loops. The sql
> query itself would run on distributed datasets so it's a parallel
> execution. Putting them in foreach would make it nested in nested. So
> serialization would become hard. Not sure I could explain it right.
>
> If you can create the dataframe in main, you can register it as a table
> and run the queries in main method itself. You don't need to coalesce or
> run the method within foreach.
>
> Regards
> Sunita
>
> On Tuesday, October 25, 2016, Ajay Chander <itschevva@gmail.com> wrote:
>
>>
>> Jeff, Thanks for your response. I see below error in the logs. You think
>> it has to do anything with hiveContext ? Do I have to serialize it before
>> using inside foreach ?
>>
>> 16/10/19 15:16:23 ERROR scheduler.LiveListenerBus: Listener SQLListener
>> threw an exception
>> java.lang.NullPointerException
>>         at org.apache.spark.sql.execution.ui.SQLListener.onTaskEnd(SQLL
>> istener.scala:167)
>>         at org.apache.spark.scheduler.SparkListenerBus$class.onPostEven
>> t(SparkListenerBus.scala:42)
>>         at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
>> istenerBus.scala:31)
>>         at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveL
>> istenerBus.scala:31)
>>         at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBu
>> s.scala:55)
>>         at org.apache.spark.util.AsynchronousListenerBus.postToAll(Asyn
>> chronousListenerBus.scala:37)
>>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Asynchronous
>> ListenerBus.scala:80)
>>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonf
>> un$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
>>         at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.sca
>> la:1181)
>>         at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(As
>> ynchronousListenerBus.scalnerBus.scala:63)
>>
>> Thanks,
>> Ajay
>>
>> On Tue, Oct 25, 2016 at 11:45 PM, Jeff Zhang <zjffdu@gmail.com> wrote:
>>
>>>
>>> In your sample code, you can use hiveContext in the foreach as it is
>>> scala List foreach operation which runs in driver side. But you cannot use
>>> hiveContext in RDD.foreach
>>>
>>>
>>>
>>> Ajay Chander <itschevva@gmail.com>于2016年10月26日周三 上午11:28写道:
>>>
>>>> Hi Everyone,
>>>>
>>>> I was thinking if I can use hiveContext inside foreach like below,
>>>>
>>>> object Test {
>>>>   def main(args: Array[String]): Unit = {
>>>>
>>>>     val conf = new SparkConf()
>>>>     val sc = new SparkContext(conf)
>>>>     val hiveContext = new HiveContext(sc)
>>>>
>>>>     val dataElementsFile = args(0)
>>>>     val deDF = hiveContext.read.text(dataElementsFile).toDF("DataElement").coalesce(1).distinct().cache()
>>>>
>>>>     def calculate(de: Row) {
>>>>       val dataElement = de.getAs[String]("DataElement").trim
>>>>       val df1 = hiveContext.sql("SELECT cyc_dt, supplier_proc_i, '" + dataElement
+ "' as data_elm, " + dataElement + " as data_elm_val FROM TEST_DB.TEST_TABLE1 ")
>>>>       df1.write.insertInto("TEST_DB.TEST_TABLE1")
>>>>     }
>>>>
>>>>     deDF.collect().foreach(calculate)
>>>>   }
>>>> }
>>>>
>>>>
>>>> I looked at https://spark.apache.org/docs/1.6.0/api/scala/index.html#org.apache.spark.sql.hive.HiveContext
and I see it is extending SqlContext which extends Logging with Serializable.
>>>>
>>>> Can anyone tell me if this is the right way to use it ? Thanks for your time.
>>>>
>>>> Regards,
>>>>
>>>> Ajay
>>>>
>>>>
>>

Mime
View raw message