spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: getting error: value toDF is not a member of Seq[columns]
Date Wed, 05 Sep 2018 22:24:13 GMT
Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._


         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89

So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with
MongoSpark.save(df,
writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued,
price))).toDF

[error]
/data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235:
value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key,
ticker, timeissued, price))).toDF
[


frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> yep already tried it and it did not work.
>
> thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <deepakmca05@gmail.com> wrote:
>
>> Try this:
>>
>> *import **spark*.implicits._
>>
>> df.toDF()
>>
>>
>> On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>>> With the following
>>>
>>> case class columns(KEY: String, TICKER: String, TIMEISSUED: String,
>>> PRICE: Float)
>>>
>>>  var key = line._2.split(',').view(0).toString
>>>  var ticker =  line._2.split(',').view(1).toString
>>>  var timeissued = line._2.split(',').view(2).toString
>>>  var price = line._2.split(',').view(3).toFloat
>>>
>>>   var df = Seq(columns(key, ticker, timeissued, price))
>>>  println(df)
>>>
>>> I get
>>>
>>>
>>> List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))
>>>
>>> So just need to convert that list to DF
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>>> Thanks!
>>>>
>>>> The spark  is version 2.3.0
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <kabhwan@gmail.com> wrote:
>>>>
>>>>> You may also find below link useful (though it looks far old), since
>>>>> case class is the thing which Encoder is available, so there may be another
>>>>> reason which prevent implicit conversion.
>>>>>
>>>>>
>>>>> https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Spark-Scala-Error-value-toDF-is-not-a-member-of-org-apache/m-p/29994/highlight/true#M973
>>>>>
>>>>> And which Spark version do you use?
>>>>>
>>>>>
>>>>> 2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <kabhwan@gmail.com>님이
작성:
>>>>>
>>>>>> Sorry I guess I pasted another method. the code is...
>>>>>>
>>>>>> implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T]
= {
>>>>>>   DatasetHolder(_sqlContext.createDataset(s))
>>>>>> }
>>>>>>
>>>>>>
>>>>>> 2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <kabhwan@gmail.com>님이
작성:
>>>>>>
>>>>>>> I guess you need to have encoder for the type of result for
>>>>>>> columns().
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229
>>>>>>>
>>>>>>> implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T]
= {
>>>>>>>   DatasetHolder(_sqlContext.createDataset(rdd))
>>>>>>> }
>>>>>>>
>>>>>>> You can see lots of Encoder implementations in the scala code.
If
>>>>>>> your type doesn't match anything it may not work and you need
to provide
>>>>>>> custom Encoder.
>>>>>>>
>>>>>>> -Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>> 2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebzadeh@gmail.com>님이
>>>>>>> 작성:
>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> I already do that as below
>>>>>>>>
>>>>>>>>     val sqlContext= new
>>>>>>>> org.apache.spark.sql.SQLContext(sparkContext)
>>>>>>>>   import sqlContext.implicits._
>>>>>>>>
>>>>>>>> but still getting the error!
>>>>>>>>
>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any
monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabhwan@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> You may need to import implicits from your spark session
like
>>>>>>>>> below:
>>>>>>>>> (Below code is borrowed from
>>>>>>>>> https://spark.apache.org/docs/latest/sql-programming-guide.html)
>>>>>>>>>
>>>>>>>>> import org.apache.spark.sql.SparkSession
>>>>>>>>> val spark = SparkSession
>>>>>>>>>   .builder()
>>>>>>>>>   .appName("Spark SQL basic example")
>>>>>>>>>   .config("spark.some.config.option", "some-value")
>>>>>>>>>   .getOrCreate()
>>>>>>>>> // For implicit conversions like converting RDDs to DataFramesimport
spark.implicits._
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh
<
>>>>>>>>> mich.talebzadeh@gmail.com>님이 작성:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I have spark streaming that send data and I need
to put that data
>>>>>>>>>> into MongoDB for test purposes. The easiest way is
to create a DF from the
>>>>>>>>>> individual list of columns as below
>>>>>>>>>>
>>>>>>>>>> I loop over individual rows in RDD and perform the
following
>>>>>>>>>>
>>>>>>>>>>     case class columns(KEY: String, TICKER: String,
TIMEISSUED:
>>>>>>>>>> String, PRICE: Float)
>>>>>>>>>>
>>>>>>>>>>          for(line <- pricesRDD.collect.toArray)
>>>>>>>>>>          {
>>>>>>>>>>             var key = line._2.split(',').view(0).toString
>>>>>>>>>>            var ticker =  line._2.split(',').view(1).toString
>>>>>>>>>>            var timeissued = line._2.split(',').view(2).toString
>>>>>>>>>>            var price = line._2.split(',').view(3).toFloat
>>>>>>>>>>            val priceToString = line._2.split(',').view(3)
>>>>>>>>>>            if (price > 90.0)
>>>>>>>>>>            {
>>>>>>>>>>              println ("price > 90.0, saving to
MongoDB
>>>>>>>>>> collection!")
>>>>>>>>>>             // Save prices to mongoDB collection
>>>>>>>>>>            * var df = Seq(columns(key, ticker, timeissued,
>>>>>>>>>> price)).toDF*
>>>>>>>>>>
>>>>>>>>>> but it fails with message
>>>>>>>>>>
>>>>>>>>>>  value toDF is not a member of Seq[columns].
>>>>>>>>>>
>>>>>>>>>> What would be the easiest way of resolving this please?
>>>>>>>>>>
>>>>>>>>>> thanks
>>>>>>>>>>
>>>>>>>>>> Dr Mich Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>>>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction
of data or any other
>>>>>>>>>> property which may arise from relying on this email's
technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case
be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or
destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>

Mime
View raw message