spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Manohar753 <manohar.re...@happiestminds.com>
Subject DataFrame InsertIntoJdbc() Runtime Exception on cluster
Date Wed, 15 Jul 2015 09:13:15 GMT
Hi All,

Am trying to add few new rows for existing table in mysql using
DataFrame.But it is adding new rows to the table in local environment but on
spark cluster below is the runtime exception.


Exception in thread "main" java.lang.RuntimeException: Table msusers_1
already exists.
        at scala.sys.package$.error(package.scala:27)
        at
org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:240)
        at
org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1481)
        at com.sparkexpert.UserMigration.main(UserMigration.java:59)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
        at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
        at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/15 08:13:42 INFO spark.SparkContext: Invoking stop() from shutdown
hook
15/07/15 08:13:42 INFO handler.ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
15/07/15 08:13:

code snippet is below:

System.out.println(Query);
        Map<String, String> options = new HashMap<>();
        options.put("driver",
PropertyLoader.getProperty(Constants.msSqlDriver));
        options.put("url", PropertyLoader.getProperty(Constants.msSqlURL));
        options.put("dbtable",Query);       
        options.put("numPartitions", "1");
        DataFrame delatUsers = sqlContext.load("jdbc", options);
        
        
        delatUsers.show();
        //Load latest users DataFrame

        String mysQuery="(SELECT * FROM msusers_1) as employees_name";
        Map<String, String> msoptions = new HashMap<>();
       
msoptions.put("driver",PropertyLoader.getProperty(Constants.mysqlDriver));
        msoptions.put("url",
PropertyLoader.getProperty(Constants.mysqlUrl));
        msoptions.put("dbtable",mysQuery);       
        msoptions.put("numPartitions", "1");
        DataFrame latestUsers = sqlContext.load("jdbc", msoptions); 
        
//Get Update users Data
        DataFrame updatedUsers =   
delatUsers.as("ms").join(latestUsers.as("lat"),
col("lat.uid").equalTo(col("ms.uid")),
"inner").select("ms.revision","ms.uid","ms.UserType","ms.FirstName","ms.LastName","ms.Email","ms.smsuser_id","ms.dev_acct","ms.lastlogin","ms.username","ms.schoolAffiliation","ms.authsystem_id","ms.AdminStatus");
 //Insert new users into Mysql DB
*       
delatUsers.except(updatedUsers).insertIntoJDBC(PropertyLoader.getProperty(Constants.mysqlUrl),
"msusers_1", false);
*
 the bold line is the Exception occur line.
Team please give me some inputs if any one had come across this .
but for the same override the table is working fine on cluster also.

Thanks,
manoar



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-InsertIntoJdbc-Runtime-Exception-on-cluster-tp23851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message