Package : Loader : RDBM Class - waidyanatha/dongcha GitHub Wiki

from dongcha.etl.loader import sparkRDBM

Introduction

How to use the Utils app, ETL module Loader package sparkDBwls.py

Current working RDBMS

  • PostgreSQL - all CRUD function working and tested
  • BigQuery - waiting for spark 3.4 connectors (jars) to be released

The package builds on pyspark database read/write features. Moreover, it is designed for spark partitions and multi node implementation. It supports any RDBMS such as Postgresql, MySQL, MS SQL Server, etc. It has been tested for postgresql. An implementation requires defining the:

  1. database connection parameters in the utils config file; namely, app.cfg [DATABASE] section
  2. pyspark session parameters with the relevant utils config file; i.e., app.cfg [SPARK] section
  3. necessary JAR files for the respective RDBMS are made available in the $SPARK_HOME/jars/ folder. See below for more details.

The package features include function to CRUD a database using two ways

  1. dbTable - defining the schema and table name
  2. query - defining an SQL query; such as with multiple tables join and where statements

Prerequisites

  1. APP.CFG settings must be defined with the necessary DATABASE section. a. If using cloud, then additional cloud service specific sections must be configured
  2. Postgresql instance
    1. version >= 11.0 installation
    2. In addition to default pg_admin user, another user account for the application must be registered with full CRUD privileges to all schemas and databases and must be registered in the APP.CFG file.
  3. BigQuery - TBD

Utils APP.CFG settings

The necessary parameters for the two config sections are discussed below.

Database settings

    [DATABASE]
    #--database types: mysql, postgresql (default: postgres)
    dbtype = PostgreSQL
    #--hostIP default 127.0.0.1
    dbhostip = 127.0.0.1
    #--port default 5432
    dbport = 5432
    #--database driver
    #  postgresql for apache spark: 'org.postgresql.Driver'
    dbdriver = org.postgresql.Driver
    #--database name
    dbname = mydbname
    #--schema name
    dbschema = mydbschema
    #--username and password to connect
    #  default db_user=postgres, db_pswd = postgres
    dbuser = mydbuser
    dbpswd = mydbpswd

Spark settings

Prerequisits

    [SPARK]
    #--settings to connect to the database to perform work loads '''
    # $SPARK_HOME directory
    homedir = /opt/spark/
    # $SPARK_HOME/bin directory
    bindir = /opt/spark/bin
    # spark config type; default spark.jars
    config = spark.jars
    #  to download Postgres JDBC drivers: https://jdbc.postgresql.org/
    jardir = /opt/spark/jars/postgresql-42.5.0.jar
    # master can be local[*], mesos, or yarn
    master = local[1]
    # partitions can be any number based on the number of clusters and cores
    partitions = 2
    # default format is jdbc
    format = jdbc
    # savemode can be append or overwrite
    savemode = Append

Setting Class Properties

There are three ways to set the class properties. The hierarchy for setting the property values are in the order listed below; i.e. if the parameter is passed through the function, it will overwrite the set value and if not defined will attempt to use the value defined in the config file.

  1. Passing the property values to the CRUD function as key-value pairs; e.g. kwargs['DBTYPE']='PostgreSQL'
  2. Simply using the class property setter function of the property; e.g. clsDB.dbType = 'PostgreSQL'
  3. If defined in the app.cfg, then will set the property parameter with the value defined in the config

Class properties

DB parameters

    self._dbType     # set/get db type; e.g. PostgreSQL, MySQL, 
    self._dbDriver   # set/get db driver; default 'org.postgresql.Driver'
    self._dbHostIP   # set/get db IP address; default localhost = 127.0.0.1
    self._dbPort     # set/get db TCP/IP port; default 5432
    self._dbName     # set/get db name
    self._dbSchema   # set/get db schema name
    self._partitions # set/get number of partitions; default = 2
    self._dbUser     # set/get db schema username
    self._dbPswd     # set/get db schema password
    self._dbConnURL  # set/get db connection url

Session parameters

    self._homeDir   # set/get the $SPARK_HOME path
    self._binDir    # set/get the $SPARK_HOME\bin path
    self._config    # set/get the config type; defulat 'spark.jars'
    self._jarDir    # set/get the $SPARK_HOME/jars file; default postgresql-42.5.0.jar
    self._appName   # set/get spark appName; default is constructed from package identifiers 
    self._master    # set/get master = local[*], mesos, or yarn
    self._rwFormat  # set/get read/write formal; default 'jdbc'
    self._session   # set/get spark session
    self._saveMode  # set/get savemode = append or overwrite

Functions (@ClassMethods)

There are four key functions to support CRUD the database tables, read views, or call DB stored procedures.

Read data from the table(s)

    @classmethod
    def read_data_from_table(
        self,
        select:str="",
        db_table:str="",
        db_column:str="",
        lower_bound=None,
        upper_bound=None,
        **kwargs) -> DataFrame:
  • Madetory that either the select attribute or db_table must be specified
    • select attribute takes an SQL query statement as a single string
    • db_table attribute takes the table, name with or without the schema as a prefix;
      • if the schema is not specified the function will default to the schema name set in the class property
  • The db_column specifies the column to use when applying the partitionings; it is optional
  • If the lower_bound and upper_bound are specified, then it is applied to the defined partitioning column; also optional
  • The **kwargs can be used to
    • set the self.session and self.dbConnURL parameters; if unspecified will use default values
    • set the self.partitions parameter; if unspecified will use default values

Insert data into a table

    @classmethod
    def insert_sdf_into_table(
        self,
        save_sdf,
        db_table:str,
        **kwargs):
  • The save_sdf defines the data set; if not a pyspark dataframe, the self.data property will attempt to convert it to a pyspark dataframe. Hence, you can provide a pandas dataframe as well
  • The db_table attribute takes the table, name with or without the schema as a prefix; * if the schema is not specified the function will default to the schema name set in the class property
  • The **kwargs can be used to
    • set the self.session and self.dbConnURL parameters; if unspecified will use default values