spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazuaki Ishizaki (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24220) java.lang.NullPointerException at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
Date Fri, 11 May 2018 16:18:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472195#comment-16472195
] 

Kazuaki Ishizaki commented on SPARK-24220:
------------------------------------------

Thank you for reporting an issue. Would it be possible to post standalone reproduable program?
This program seems to connect to an external database or something thru {{DriverManager.getConnection(adminUrl)}}.

> java.lang.NullPointerException at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
> --------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24220
>                 URL: https://issues.apache.org/jira/browse/SPARK-24220
>             Project: Spark
>          Issue Type: Bug
>          Components: Java API
>    Affects Versions: 2.2.0
>            Reporter: joy-m
>            Priority: Major
>
> def getInputStream(rows:Iterator[Row]): PipedInputStream ={
>  printMem("before gen string")
>  val pipedOutputStream = new PipedOutputStream()
>  (new Thread() {
>  override def run(){
>  if(rows == null){
>  logError("rows is null==========>")
>  }else{
>  println(s"record-----start-----${rows.length}")
>  try {
>  while (rows.hasNext) {
>  val row = rows.next()
>  println(row)
>  val str = row.mkString("\001") + "\r\n"
>  println(str)
>  pipedOutputStream.write(str.getBytes(StandardCharsets.UTF_8))
>  }
>  println("record-----end-----")
>  pipedOutputStream.close()
>  } catch {
>  case ex:Exception =>
>  ex.printStackTrace()
>  }
>  }
>  }
>  }).start()
>  println("pipedInPutStream----------")
>  val pipedInPutStream = new PipedInputStream()
>  pipedInPutStream.connect(pipedOutputStream)
>  println("pipedInPutStream--- conn-------")
>  printMem("after gen string")
>  pipedInPutStream
> }
> resDf.coalesce(15).foreachPartition(rows=>{
>  if(rows == null){
>  logError("rows is null=========>")
>  }else{
>  val copyCmd = s"COPY ${tableName} FROM STDIN with DELIMITER as '\001' NULL as 'null
string'"
>  var con: Connection = null
>  try {
>  con = DriverManager.getConnection(adminUrl)
>  val copyManager = new CopyManager(con.asInstanceOf[BaseConnection])
>  val start = System.currentTimeMillis()
>  var count: Long = 0
>  var copyCount: Long = 0
>  println("before copyManager=====>")
>  copyCount += copyManager.copyIn(copyCmd, getInputStream(rows))
>  println("after copyManager=====>")
>  val finish = System.currentTimeMillis()
>  println("copyCount:" + copyCount + " count:" + count + " time(s):" + (finish - start)
/ 1000)
>  con.close()
>  } catch {
>  case ex:Exception =>
>  ex.printStackTrace()
>  println(s"copyIn error!${ex.toString}")
>  } finally {
>  try {
>  if (con != null) {
>  con.close()
>  }
>  } catch {
>  case ex:SQLException =>
>  ex.printStackTrace()
>  println(s"copyIn error!${ex.toString}")
>  }
>  }
>  }
>  
> 18/05/09 13:31:30 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread
Thread[Thread-4,5,main]
> java.lang.NullPointerException
>  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:83)
>  at org.apache.spark.sql.execution.SortExec.createSorter(SortExec.scala:87)
>  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.init(Unknown
Source)
>  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:392)
>  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8.apply(WholeStageCodegenExec.scala:389)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:844)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
>  at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
>  at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
>  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:107)
>  at scala.collection.AbstractIterator.size(Iterator.scala:1336)
>  at scala.collection.Iterator$class.length(Iterator.scala:1189)
>  at scala.collection.AbstractIterator.length(Iterator.scala:1336)
>  at org.admaster.jice.timeExpand.VisitsDivid$$anon$1.run(VisitsDivid.scala:548)
> 18/05/09 13:31:30 INFO storage.BlockManager: Found block rdd_15_47 remotely



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message