spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <>
Subject PySpark functions for various sources and sinks
Date Fri, 02 Apr 2021 11:29:41 GMT

This is an idea that I have to turn them into a Spark package.

Over a period, I had to develop various Python functions to set the Spark
connection parameters, read from and write to various sources and sinks.
These allow us to use the existing packages for spark utilities in Python
quickly without worrying about details. The  example of contents are as

   1. Create or get Spark session local
   2. Create or replace Spark session for a distributed environment
   3. Create Spark context
   4. Create Hive context
   5. Load Spark configuration parameters for Structured Streaming
   including setting for back
   pressure, kafka.maxRatePerPartition, etc
   6. Load Spark configuration parameters for Hive
   7. Load Spark configuration parameters for Google BigQuery
   8. Load Spark configuration parameters for Redis
   9. Load data from Google BigQuery into DF
   10. Write data from DF to Google BigQuery
   11. loadTableFromJDBC
   12. writeTableWithJDBC
   13. loadTableFromRedis
   14. writeTableToRedis

All parameter settings are user driven and can be read through a yaml file
read into the Python dictionary. So pretty flexible

For example to read from Jdbc the code is as below

def loadTableFromJDBC(spark, url, tableName, user, password, driver,
       read_df = \
            format("jdbc"). \
            option("url", url). \
            option("dbtable", tableName). \
            option("user", user). \
            option("password", password). \
            option("driver", driver). \
            option("fetchsize", fetchsize). \
       return read_df
    except Exception as e:
        print(f"""{e}, quitting""")

and this is the way it is called to read from Oracle through JDBC

from sparkutils import sparkstuff as s
# read data through jdbc from Oracle
tableName = self.config['OracleVariables']['sourceTable']
fullyQualifiedTableName =
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
driver = self.config['OracleVariables']['oracle_driver']
fetchsize = self.config['OracleVariables']['fetchsize']
read_df =

Let me know if this is something worth considering and worth collaborating.


   view my Linkedin profile

*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

View raw message