VDK: Support multiple databases in the same job - vmware/versatile-data-kit GitHub Wiki

Overview

VDK currently supports configuring just one database to use for executing sql files and sending queries with job_input. This is configured using VDK_DB_DEFAULT_TYPE. VDK supports a separate config for ingestion - VDK_INGESTION_TARGET_DEFAULT. This limits us to a maximum of two databases in a best case scenario where we query one database and ingest into another. If we want to query the same database we ingest into, that works fine. However if we want to query and ingest into the same database, but also query another db, we're out of options.

Example config.ini

[vdk]
db_default_type=SQLITE
sqlite_file=./vdk-sqlite.db
ingest_target_default=DUCKDB
duck_db_database=./duck_temp.db
duck_db_ingest_auto_create_table_enabled=True

01_run.py

from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):

    # query SQLITE
    job_input.execute_query("CREATE TABLE stocks (date text, symbol text, price real)")
    job_input.execute_query("INSERT INTO stocks VALUES ('2020-01-01', 'GOOG', 122.0)")
    result = job_input.execute_query("SELECT * FROM stocks")
    payload = {
        "date": result[0][0],
        "symbol": result[0][1],
        "price": 500,
    }
    
    # ingest into DUCKDB
    job_input.send_object_for_ingestion(
        payload=payload, destination_table="test_duckdb_table", method="duckdb"
    )

    # how do I query DUCKDB, though? execute_query queries SQLITE ;(
    # I can probably import the python duckdb driver, but what's the point of having VDK in this case?

The above scenario is also confusing from a user standpoint. There is absolutely nothing that suggest execute_query will query SQLITE and send_object_for_ingestion will insert into DUCKDB.

Requirements

Users should be able to:

  1. Specify multiple database configurations
  2. Each database configuration results in a managed connection registered under a user-defined name
  3. Registering a database also registers an ingestor for the same database under the same name
  4. Fetch database connections/ingestors by name using the job_input api and then run queries/ingest data
  5. Have a default user-configured database that vdk can fall back to. If the user hasn't configured a default database and calls the managed connection/ingestion API, this should result in an error.

Additional requirements

  1. We should have a system for configuring which database connection executes which SQL file
  2. Ingestion plugins that are not tied to a database should also be supported

Implementation

Configuration

Option 1: Nest database configurations

[vdk]
; general vdk config

[vdk.database]
; general database config

[vdk.database.default]
type=oracle
use_secrets=True
user=defaultuser
host=defaulthost.com
port=1522
service_name=default_service
thick_mode=True

[vdk.database.oracle1]
type=oracle
use_secrets=True
user=myuser
host=localhost
port=1521
sid=free
thick_mode=True

[vdk.database.impala1]
type=impala
use_secrets=True
user=impala_user

[vdk.database.duckdb1]
; some other database config

[vdk.database.sqlite1]

PRO: It complies with our standard config flow
CON: Python's configparser does not support section nesting out of the box. For example, [vdk], [vdk.database] and [vdk.database.oracle1] are considered different sections. There's no way to fetch all sections that start with [vdk]. We can work around this by implementing it ourselves, but it requires more effort.

Option 2: Configure databases in separate step

00_configure_databases.py

def run(job_input):
    job_input.register_db(key="default", type="oracle", host="localhost", port="1521", sid="free")
    job_input.register_db(key="oracle_1", type="oracle", host="http://some_host_whatever", port="1522", sid="not_free")
    job_input.register_db(key="impala_1", type="impala", host="http://impala_host", database="mydatabase") # <- pass other db-specific properties here

PRO: Doesn't require convoluted config.ini files.
CON: It fundamentally changes the way we've configured vdk jobs up til now, so further discussion is required.
CON: SQL support gets complicated, e.g. you can't have an sql-only job.

Ingestion API

def run(job_input):
    obj = {"id": 1, "name": "Bob"}
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="name_table"
    )
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="another_table", destination_db="oracle_1"
    )
    job_input.send_object_for_ingestion(
        payload=obj, destination_table="storage_table", destination_db="impala_1"
    )

Detailed implementation

To pull this off, we should support the following

  1. Register managed connection and ingester classes for every plugin
  2. Pass different configs to the same interface, e.g. ManagedConnectionBase. We must be able to instantiate managed connection classes from a dict of configurations.
  3. Name connections and ingesters and access them by name
  4. Attach this functionality either to the config.ini files or to the job_input API.

router.py

    def add_connection_class(
        self,
        dbtype: str,
        clazz: Type[ManagedConnectionBase]
    ) -> None:
        self._supported_connection_types[dbtype.lower()] = clazz

    def get_conn_class(
        self,
        dbtype: str,
    ) -> Type[ManagedConnectionBase]:
        return self._supported_connection_types.get(dbtype.lower(), None)

    def add_open_named_connection_factory_method(
        self,
        connection_name: str,
        open_connection_func: Callable[
            [], Union[ManagedConnectionBase, PEP249Connection]
        ],
    ) -> None:
        self._named_connection_builders[connection_name] = open_connection_func

Do the same for ingestion.

ingester_router.py

    def add_ingester_class(
        self,
        dbtype: str,
        clazz: Type[IIngesterPlugin]
    ) -> None:
        self._supported_ingester_types[dbtype.lower()] = clazz

    def add_named_ingester_factory_method(
        self,
        name: str,
        ingester_plugin: IngesterPluginFactory,
    ) -> None:
        """
        Add new ingester.
        """
        self._named_ingester_builders[name.lower()] = ingester_plugin

Add an abstract class method that instantiates ManagedConnectionBase from a dictionary

managed_connection_base.py

    @classmethod
    @abstractmethod
    def _from_dict(cls, **kwargs):
        """
        override this if you want to support multiple connections and ingestion operations
        """
        pass

If database plugins want to support multiple databases, they should

  1. Extend ManagedConnectionBase
  2. Override _connect and _from_dict
  3. Call add_connection_class and pass the ManagedConnectionBase child class
  4. Call add_ingester_class and pass the IIngesterPlugin class

The following snippets use the oracle plugin as an example, but every plugin that supports ingestion into multiple data sources should conform to this interface.

oracle_connection.py

class OracleConnection(ManagedConnectionBase):
    def __init__(
        self,
        user: str,
        password: str,
        connection_string: str = None,
        host=None,
        port=1521,
        sid: str = None,
        service_name: str = None,
        thick_mode: bool = True,
        thick_mode_lib_dir: Optional[str] = None,
    ):
        super().__init__(log)
        self._oracle_user = user
        self._oracle_password = password
        self._host = host
        self._port = port
        self._sid = sid
        self._service_name = service_name
        self._oracle_connection_string = connection_string
        self._thick_mode = thick_mode
        self._thick_mode_lib_dir = thick_mode_lib_dir

    @classmethod
    def _from_dict(cls, **kwargs):
        oracle_user = kwargs.get("user", None)
        oracle_password = kwargs.get("password", None)
        host = kwargs.get("host", "localhost")
        port = kwargs.get("port", 1521)
        sid = kwargs.get("sid", None)
        service_name = kwargs.get("service_name", None)
        oracle_connection_string = kwargs.get("connection_string", None)
        thick_mode = kwargs.get("thick_mode", None)
        thick_mode_lib_dir = kwargs.get("thick_mode_lib_dir", None)
        return cls(oracle_user, oracle_password, host, port, sid,
                   service_name, oracle_connection_string, thick_mode, thick_mode_lib_dir)

    def _connect(self) -> Connection:
        import oracledb

        if self._thick_mode:
            if self._thick_mode_lib_dir:
                oracledb.init_oracle_client(self._thick_mode_lib_dir)
            else:
                oracledb.init_oracle_client()
        if self._oracle_connection_string:
            log.debug("Connecting to Oracle using connection string")
            params = oracledb.ConnectParams()
            params.set(user=self._oracle_user)
            params.set(password=self._oracle_password)
            params.parse_connect_string(self._oracle_connection_string)

            conn = oracledb.connect(params=params)
        else:
            log.debug("Connecting to Oracle using host,port,sid")
            params = oracledb.ConnectParams(
                user=self._oracle_user,
                password=self._oracle_password,
                host=self._host,
                port=self._port,
                sid=self._sid,
                service_name=self._service_name,
            )
            conn = oracledb.connect(params=params)
        return conn

ingest_to_oracle.py

class IngestToOracle(IIngesterPlugin):
    def __init__(self, name: str, connections: ManagedConnectionRouter):
        self.conn: PEP249Connection = connections.open_named_connection(name).connect()
        self.cursor: ManagedCursor = self.conn.cursor()

   # Note: we should also have a from_dict class method here
   # Possibly also constructor overloading for ingesters that are not database-related

oracle_plugin.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        conf = OracleConfiguration(context.core_context.configuration)
        context.connections.add_connection_class("oracle", OracleConnection)
        context.ingester.add_ingester_class("oracle", IngestToOracle)

We then add a plugin in core that parses all the named configs and instantiates the named connections and ingesters.

named_database_conf.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        for conn_type, name in self._config.named_connections:
            if conn_type in context.connections.get_supported_conn_types():
                conn_class = context.connections.get_conn_class(conn_type)
                ingest_class = context.ingester.get_ingest_class(conn_type)
                context.connections.add_open_named_connection_factory_method(
                    name,
                    lambda: conn_class.from_dict(self._config.get_config_by_name(name))
                )
                context.ingester.add_named_ingester_factory_method(
                    name,
                    lambda: ingest_class(name, context.connections)
                )

Non-database ingestion

[vdk.ingest.myapi]
type=http
; other configs

http_plugin.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        context.ingester.add_ingester_class("oracle", IngestToHttp)

named_database_conf.py

    @hookimpl(trylast=True)
    def initialize_job(self, context: JobContext):
        for ingester_type, name in self._config.named_ingesters:
            if ingester_type in context.get_supported_ingester_types():
                ingest_class = context.ingester.get_ingest_class(conn_type)
                context.connections.add_named_ingester_factory_method(
                    name,
                    lambda: ingest_class.from_dict(self._config.get_config_by_name(name))
                )

SQL Files

The default connection should be used to run sql files that have no other database connection configured. This leaves us with the problem of matching sql files to the named connections we'd like to execute them.

Option 1 - specify the named connection in the file name

├── 10_first_step.sql <-- will be executed by default connection
├── 20_another_step.oracle1.sql <-- will be executed by the connection named "oracle1"
├── README.md
├── config.ini
├── out.txt
└── requirements.txt

Option 2 - specify the named connection in config.ini

[vdk.database.default]
type=oracle
use_secrets=True
user=defaultuser
host=defaulthost.com
port=1522
service_name=default_service
thick_mode=True

[vdk.database.oracle1]
type=oracle
use_secrets=True
user=myuser
host=localhost
port=1521
sid=free
thick_mode=True
sql_steps=20_another_step,40_last_step

Option 3 - specify the named connection using a python call

00_configure_databases.py

def run(job_input):
    job_input.specify_connection(["20_another_step.py", "40_last_step.py"], "oracle1")