spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akshayhazari <akshayhaz...@gmail.com>
Subject Mysql retrieval and storage using JdbcRDD
Date Mon, 10 Nov 2014 11:40:52 GMT
So far I have tried this and I am able to compile it successfully . There
isn't enough documentation on spark for its usage with databases. I am using
AbstractFunction0 and AbsctractFunction1 here. I am unable to access the
database. The jar just runs without doing anything when submitted. I want to
know how is it supposed to be done and what wrongs have I done here. Any
help is appreciated.

import java.io.Serializable;
import scala.*;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
import scala.runtime.*;
import scala.collection.mutable.LinkedHashMap;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.*;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.StructType;
import org.apache.spark.sql.api.java.StructField;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import java.sql.*;
import java.util.*;
import java.io.*;
public class Spark_Mysql {
    @SuppressWarnings("serial")
    static class Z extends AbstractFunction0<Connection> 
    {
    	Connection con;
    	public Connection apply()
    	{
	    
	    try {
		Class.forName("com.mysql.jdbc.Driver");
	
con=DriverManager.getConnection("jdbc:mysql://localhost:3306/?user=azkaban&password=password");
	    }
	    catch(Exception e)
		{
		}
	    return con;
    	}
	
    }
    static public class Z1 extends AbstractFunction1<ResultSet,Integer> 
    {
	int ret;
	public Integer apply(ResultSet i) {
	    
	    try{
		ret=i.getInt(1);
	    }
	    catch(Exception e)
		{}
	    return ret;
	}
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {

	String arr[]=new String[1];

arr[0]="/home/hduser/Documents/Credentials/Newest_Credentials_AX/spark-1.1.0-bin-hadoop1/lib/mysql-connector-java-5.1.33-bin.jar";

	JavaSparkContext ctx = new JavaSparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
	SparkContext sctx = new SparkContext(new
SparkConf().setAppName("JavaSparkSQL").setJars(arr));
	JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

	  try 
	    {
		Class.forName("com.mysql.jdbc.Driver");
	    }
	catch(Exception ex) 
	    {
		System.exit(1);
	    }
	  Connection
zconn=DriverManager.getConnection("jdbc:mysql://localhost:3306/?user=azkaban&password=password");

	  JdbcRDD rdd=new JdbcRDD(sctx,new Z(),"SELECT * FROM spark WHERE ? <= id
AND id <= ?",0L, 1000L, 10,new
Z1(),scala.reflect.ClassTag$.MODULE$.AnyRef());
		  
	  rdd.saveAsTextFile("hdfs://127.0.0.1:9000/user/hduser/mysqlrdd"); 
	  rdd.saveAsTextFile("/home/hduser/mysqlrdd"); 
    }
}



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Mysql-retrieval-and-storage-using-JdbcRDD-tp18479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message