spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jahagirdar, Madhu" <madhu.jahagir...@philips.com>
Subject java.lang.ArithmeticException while create Parquet
Date Tue, 18 Nov 2014 02:05:03 GMT
All,

This causes below error:

 However if i replace JavaHiveContext with Hive Context (see the commented code below) and
replace JavaClass with CaseClass (Scala) the same code works ok. Any reason why this could
be happening ?


ava.lang.ArithmeticException: / by zero

at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:99)

at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:92)

at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)

at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)

at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)

at org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:300)

at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

at org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:744)

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.api.java.JavaHiveContext
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Logging, SparkConf}

================================================================

object SparkStreamingToParquet extends Logging {

  /**
   *
   * @param args
   * @throws Exception
   */
  def main(args: Array[String]) {
//    if (args.length < 6) {
//      logInfo("Please provide valid parameters: <hdfsFilesLocation: hdfs://ip:8020/user/hdfs/--/>
<IMPALAtableloc hdfs://ip:8020/user/hive/--/> "
//        + "<tablename> <ex:  com.philips.BeanClassName> <appname> <no
of cores> <checkpoint-dire>");
//      logInfo("make user you give full folder path with '/' at the end i.e /user/hdfs/abc/");
//      System.exit(1);
//    }


    val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint/" //args(6)


    val jssc: StreamingContext = StreamingContext.getOrCreate(CHECKPOINT_DIR, ()=>{
      createContext(args)
    })

    jssc.start
    jssc.awaitTermination
  }


  def createContext(args:Array[String]): StreamingContext = {

     val CHECKPOINT_DIR = "hdfs://127.0.0.1:5555/user/checkpoint" //args(6)

    val sparkConf: SparkConf = new SparkConf()

    val HDFS_URI = "hdfs://127.0.0.1:5555" //sparkConf.get("spark.philips.hdfsuri");

    val HDFS_FILE_LOC = HDFS_URI+"/user/logs/" //args(0); // for streaming
    val IMPALA_TABLE_LOC = HDFS_URI+ "/user/impala/" //args(1); // impala table location
    val TEMP_TABLE_NAME = "temp_json" //args(2); // temp table name for hive

    // context
    val  BEAN_CLASS_NAME = "Person" //args(3);
    val  SPARK_APP_NAME  = "Monitor" //args(4);

    sparkConf.setAppName(SPARK_APP_NAME).setMaster("local[2]")

    var noOfCores = "3";

//    if(args.length>=6){
//      noOfCores= args(5);
//    }

   sparkConf.set("spark.cores.max", noOfCores);

   val jssc: StreamingContext = new StreamingContext(sparkConf, new Duration(30000))

    val stream = jssc.textFileStream(HDFS_FILE_LOC)

    stream.foreachRDD(rdd => {
        if(rdd!=null && rdd.count()>0) {

          val hcontext = new JavaHiveContext(rdd.sparkContext)
          hcontext.createParquetFile(Class.forName(BEAN_CLASS_NAME),IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
         //hcontext.createParquetFile[Person(IMPALA_TABLE_LOC,true,org.apache.spark.deploy.SparkHadoopUtil.get.conf).registerTempTable(TEMP_TABLE_NAME);
          val schRdd = hcontext.jsonRDD(rdd)
          schRdd.insertInto(TEMP_TABLE_NAME)
       }
    })

    jssc.checkpoint(CHECKPOINT_DIR)
    jssc
  }
}



________________________________
The information contained in this message may be confidential and legally protected under
applicable law. The message is intended solely for the addressee(s). If you are not the intended
recipient, you are hereby notified that any use, forwarding, dissemination, or reproduction
of this message is strictly prohibited and may be unlawful. If you are not the intended recipient,
please contact the sender by return e-mail and destroy all copies of the original message.

Mime
View raw message