spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarek_abouzeid <tarek.abouzei...@yahoo.com>
Subject Store Spark data into hive table
Date Sun, 01 Mar 2015 14:51:25 GMT
I am trying to store my word count output into hive data warehouse my
pipeline is:

Flume streaming => spark do word count => store result in hive table for
visualization later

my code is :

*import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: WordCount <host> <port>")
      System.exit(1)
    }

    val Array(host, port) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port.toInt)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received !!!:::::" + cnt + " flume events."
).print()
      
    // it holds the string stream (converted event body array into string)
    val body = stream.map(e => new String(e.event.getBody.array))


val counts = body.flatMap(line =>
line.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+"))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)

// TESTING storing variable counts into hive ::

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import hiveContext._ 
val good = createSchemaRDD(counts)
good.saveAsTable("meta_test")  

    ssc.start()
    ssc.awaitTermination()
  }
}*

this gives me error :  *value createschemardd is not a member of
org.apache.spark.sql.sqlcontext*

so is there any way to fix this or other method to store data into hive data
warehouse ?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Store-Spark-data-into-hive-table-tp21865.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message