PySpark functions for various sources and sinks

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

PySpark functions for various sources and sinks

Mich Talebzadeh

This is an idea that I have to turn them into a Spark package.

Over a period, I had to develop various Python functions to set the Spark connection parameters, read from and write to various sources and sinks. These allow us to use the existing packages for spark utilities in Python quickly without worrying about details. The  example of contents are as follows:

  1. Create or get Spark session local
  2. Create or replace Spark session for a distributed environment
  3. Create Spark context
  4. Create Hive context
  5. Load Spark configuration parameters for Structured Streaming including setting for back pressure, kafka.maxRatePerPartition, etc
  6. Load Spark configuration parameters for Hive
  7. Load Spark configuration parameters for Google BigQuery
  8. Load Spark configuration parameters for Redis
  9. Load data from Google BigQuery into DF
  10. Write data from DF to Google BigQuery
  11. loadTableFromJDBC
  12. writeTableWithJDBC
  13. loadTableFromRedis
  14. writeTableToRedis
All parameter settings are user driven and can be read through a yaml file read into the Python dictionary. So pretty flexible

For example to read from Jdbc the code is as below

def loadTableFromJDBC(spark, url, tableName, user, password, driver, fetchsize):
       read_df = \
            format("jdbc"). \
            option("url", url). \
            option("dbtable", tableName). \
            option("user", user). \
            option("password", password). \
            option("driver", driver). \
            option("fetchsize", fetchsize). \
       return read_df
    except Exception as e:
        print(f"""{e}, quitting""")

and this is the way it is called to read from Oracle through JDBC

from sparkutils import sparkstuff as s
# read data through jdbc from Oracle
tableName = self.config['OracleVariables']['sourceTable']
fullyQualifiedTableName = self.config['OracleVariables']['dbschema']+'.'+tableName
user = self.config['OracleVariables']['oracle_user']
password = self.config['OracleVariables']['oracle_password']
driver = self.config['OracleVariables']['oracle_driver']
fetchsize = self.config['OracleVariables']['fetchsize']
read_df = s.loadTableFromJDBC(self.spark,oracle_url,fullyQualifiedTableName,user,password,driver,fetchsize)
Let me know if this is something worth considering and worth collaborating.


   view my Linkedin profile


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.