spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Insert into table with one the value is derived from DB function using spark
Date Fri, 18 Jun 2021 20:28:59 GMT
I gather you mean using JDBC to write to the Oracle table?

Spark provides a unified framework to write to any JDBC compliant database.

def writeTableWithJDBC(dataFrame, url, tableName, user, password, driver,
mode):
    try:
        dataFrame. \
            write. \
            format("jdbc"). \
            option("url", url). \
            option("dbtable", tableName). \
            option("user", user). \
            option("password", password). \
            option("driver", driver). \
            mode(mode). \
            save()
    except Exception as e:
        print(f"""{e}, quitting""")
        sys.exit(1)

and how to write it

 def loadIntoOracleTable(self, df2):
        # write to Oracle table, all uppercase not mixed case and column
names <= 30 characters in version 12.1
        tableName =
self.config['OracleVariables']['yearlyAveragePricesAllTable']
        fullyQualifiedTableName =
self.config['OracleVariables']['dbschema']+'.'+tableName
        user = self.config['OracleVariables']['oracle_user']
        password = self.config['OracleVariables']['oracle_password']
        driver = self.config['OracleVariables']['oracle_driver']
        mode = self.config['OracleVariables']['mode']

s.writeTableWithJDBC(df2,oracle_url,fullyQualifiedTableName,user,password,driver,mode)
        print(f"""created
{config['OracleVariables']['yearlyAveragePricesAllTable']}""")
        # read data to ensure all loaded OK
        fetchsize = self.config['OracleVariables']['fetchsize']
        read_df =
s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
        # check that all rows are there
        if df2.subtract(read_df).count() == 0:
            print("Data has been loaded OK to Oracle table")
        else:
            print("Data could not be loaded to Oracle table, quitting")
            sys.exit(1)

in the statement where it says

             option("dbtable", tableName). \

You can replace *tableName* with the equivalent SQL insert statement

You will need JDBC driver for Oracle say ojdbc6.jar in
$SPARK_HOME/conf/spark-defaults.conf

spark.driver.extraClassPath
 /home/hduser/jars/jconn4.jar:/home/hduser/jars/ojdbc6.jar

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 18 Jun 2021 at 20:49, Anshul Kala <anshul.kala@gmail.com> wrote:

> Hi All,
>
> I am using spark to ingest data from file to database Oracle table . For
> one of the fields , the value to be populated is generated from a function
> that is written in database .
>
> The input to the function is one of the fields of data frame
>
> I wanted to use spark.dbc.write to perform the operation, which generates
> the insert query at back end .
>
> For example : It can generate the insert query as :
>
> Insert into table values (?,?, getfunctionvalue(?) )
>
> Please advise if it is possible in spark and if yes , how can it be done
>
> This is little urgent for me . So any help is appreciated
>
> Thanks
> Anshul
>

Mime
View raw message