-dev +user

In general you cannot create new RDDs inside closures that run on the executors (which is what sql inside of a foreach is doing).

I think what you want here is something like:

sqlContext.parquetFile("Data\\Test\\Parquet\\2").registerTempTable("temp2")
sql("SELECT col1, col2 FROM temp2").insertInto("temp")



On Mon, Dec 29, 2014 at 7:29 PM, evil <qinggangwang7@gmail.com> wrote:
Hi All,
I have a  problem when I try to use insert into in loop, and this is my code
def main(args: Array[String]) {
    //This is an empty table, schema is (Int,String)

sqlContext.parquetFile("Data\\Test\\Parquet\\Temp").registerTempTable("temp")
    //not empty table,  schema is (Int,String)
    val testData = sqlContext.parquetFile("Data\\Test\\Parquet\\2")
    testData.foreach{x=>
      sqlContext.sql("INSERT INTO temp SELECT "+x(0)+" ,'"+x(1)+"'")
    }
    sqlContext.sql("select * from
temp").collect().map(x=>(x(0),x(1))).foreach(println)
  }

when I run the code above in local mode, it will not stop and do not have
error log. The lastest log is as follows:
14/12/30 11:07:44 WARN ParquetRecordReader: Can not initialize counter due
to context is not a instance of TaskInputOutputContext, but is
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
14/12/30 11:07:44 INFO InternalParquetRecordReader: RecordReader initialized
will read a total of 200 records.
14/12/30 11:07:44 INFO InternalParquetRecordReader: at row 0. reading next
block
14/12/30 11:07:44 INFO CodecPool: Got brand-new decompressor [.gz]
14/12/30 11:07:44 INFO InternalParquetRecordReader: block read in memory in
20 ms. row count = 200
14/12/30 11:07:45 INFO SparkContext: Starting job: runJob at
ParquetTableOperations.scala:325
14/12/30 11:07:45 INFO DAGScheduler: Got job 1 (runJob at
ParquetTableOperations.scala:325) with 1 output partitions
(allowLocal=false)
14/12/30 11:07:45 INFO DAGScheduler: Final stage: Stage 1(runJob at
ParquetTableOperations.scala:325)
14/12/30 11:07:45 INFO DAGScheduler: Parents of final stage: List()
14/12/30 11:07:45 INFO DAGScheduler: Missing parents: List()
14/12/30 11:07:45 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[6]
at mapPartitions at basicOperators.scala:43), which has no missing parents
14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(53328) called with
curMem=239241, maxMem=1013836677
14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 52.1 KB, free 966.6 MB)
14/12/30 11:07:45 INFO MemoryStore: ensureFreeSpace(31730) called with
curMem=292569, maxMem=1013836677
14/12/30 11:07:45 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 31.0 KB, free 966.6 MB)
14/12/30 11:07:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on localhost:52533 (size: 31.0 KB, free: 966.8 MB)
14/12/30 11:07:45 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/12/30 11:07:45 INFO SparkContext: Created broadcast 2 from broadcast at
DAGScheduler.scala:838
14/12/30 11:07:45 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1
(MapPartitionsRDD[6] at mapPartitions at basicOperators.scala:43)
14/12/30 11:07:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

Can anyone give me a hand?

Thanks
evil



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/A-question-about-using-insert-into-in-rdd-foreach-in-spark-1-2-tp9959.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org