I guess you need to have encoder for the type of result for columns().

https://github.com/apache/spark/blob/2119e518d31331e65415e0f817a6f28ff18d2b42/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L227-L229

implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <mich.talebzadeh@gmail.com>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <kabhwan@gmail.com> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <mich.talebzadeh@gmail.com>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.