spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From <luohui20...@sina.com>
Subject 回复:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table
Date Mon, 05 Sep 2016 11:00:31 GMT

the data can be written as parquet into HDFS. But the loading data process is not working
as expected.

--------------------------------

 

Thanks&amp;Best regards!
San.Luo

----- 原始邮件 -----
发件人:<luohui20001@sina.com>
收件人:"user" <user@spark.apache.org>
主题:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table
日期:2016年09月05日 18点55分

hi guys:     I got a question that  my SparkStreaming APP can not loading data into SparkSQL
table in. Here is my code:
    val conf = new SparkConf().setAppName("KafkaStreaming for " + topics).setMaster("spark://master60:7077")
    val storageLevel = StorageLevel.DISK_ONLY
    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //Receiver-based 
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, storageLevel)

    kafkaStream.foreachRDD { rdd =>
      val x = rdd.count()
      println(s"================processing $x records=================")
      rdd.collect().foreach(println)
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      val logRDD = sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
      val logRDD2 = logRDD.map(_.split(',')).map { x =>
        NginxLog(x(0).trim().toFloat.toInt,
          x(1).trim(),
          x(2).trim(),
          x(3).trim(),
          x(4).trim(),
          x(5).trim(),
          x(6).trim(),
          x(7).trim(),
          x(8).trim(),
          x(9).trim(),
          x(10).trim())
      }
      val recDF = logRDD2.toDF
      recDF.printSchema()

      val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
      val index = rdd.id
      recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
      hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr String,u_domain String,u_url
String,u_title String,u_referrer String,u_sh String,u_sw String,u_cd String,u_lang String,u_utrace
String) STORED AS PARQUET")      hc.sql(s"LOAD DATA INPATH '/etl/tables/nginxlog/${topicNO}/${index}'
INTO TABLE nginxlog")    }

There isn't any exception during running my APP. however, except the data in the first batch
could be loaded into table nginxlog, all other batches can not be successfully loaded.I can
not understand the reason of this kind of behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1




--------------------------------

 

Thanks&amp;Best regards!
San.Luo
Mime
View raw message