VDK: SQL and Data Flow Improvements - vmware/versatile-data-kit GitHub Wiki
Current state
SQL files
SQL files can only contain a single query. For longer scripts with multiple queries, you have to use python.
def run(job_input: IJobInput):
users = job_input.execute_query("SELECT * from Users");
job_input.execute_query("ALTER TABLE Users ADD Email varchar(255);");
# something else here
SQL and python code is not decoupled. This makes it hard to maintain different versions of the data job. For one, checking diffs between versions in source control becomes more challenging. It also goes against the supposed modularity of vdk data jobs. You can't pluck out and SQL step and replace it with something else, for example. Your SQL code leaks into your python code for longer scripts unless you decide to have multiple SQL files with one query each. VDK users should be given an option to keep SQL code separate from python code.
Pulling data from one database and ingesting into another
Let's say we have an SQLite database and an Oracle database. We'd like to pull data from the SQLite database into Oracle. We configure the vdk data job to ingest into Oracle
[vdk]
ingest_method_default=ORACLE
db_default_type=oracle
oracle_use_secrets=True
oracle_user=username
oracle_connection_string=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=myhost.hostme.com)(PORT=1521))(CONNECT_DATA=(SID=mysid)))
And then have the following step
import sqlite3
def run(job_input: IJobInput):
con = sqlite3.connect("company.db")
cur = con.cursor()
qres = cur.execute("SELECT * FROM Users")
rows = qres.fetchall()
# do something fancy with the data
job_input.send_tabular_data_for_ingestion(rows=rows, column_names=["id", "username", "email"], destination_table="Users")
This works fine. The only problem is VDK already has an SQLite plugin. It even contains similar code to the one used in the example step. We're asking users to re-implement part of VDK every time they want to connect to a different database. They should just be able to use the existing SQLite plugin.
Passing data between steps
Can't front-load risky or expensive operations
The below example prepares the database for ingestion in the first step. In the second step, it does a GET request that has a high chance to fail because it fetches a large amount of data. If the request fails, the job fails. However, if the job fails, the SQL step was for nothing. We'd like to execute the risky request before we run the SQL, but we can't. It would be nice if we could front-load risky or expensive operations and use that data in later steps. This way, the job fails fast and no unnecessary steps are executed in case of failure.
10_complicated_query.sql
-- something complex that prepares the db for ingestion
20_fetch_data.py
def run(job_input: IJobInput):
# takes a long time to execute
data = []
response = requests.get(f'https://api.oldserver.com/fetchall')
# processing logic for response.json
job_input.send_tabular_data_for_ingestion(rows=data, column_names=["id", "something", "something_else"], destination_table="stuff")
Can't re-use data that was already fetched
In the below example, we fetch data from an API and want to cross-reference it with data we fetch from a database. We can't split this up in two different steps. We can't pass the data we fetched from the API to a different step.
10_process_data.py
def run(job_input: IJobInput):
data = []
response = requests.get(f'https://api.someapi.com/users/active')
data = response.json()
data = process_data(data)
# Want to further enrich the data with data I have in a db
# Can't do it in a separate step, has to be this one again
# The other option is to store it in a temp table and select it later
con = sqlite3.connect("company.db")
cur = con.cursor()
qres = cur.execute("SELECT * FROM Users where Status='ACTIVE'")
sup_data = qres.fetchall()
data = cross_reference(data, sup_data)
# Send for ingestion
Problem statement
- SQL support in VDK is not fully-featured
- Users do not have full control of the data flow in VDK data jobs
Proposed solutions
Execute whole SQL scripts as VDK steps
Overview
Executing SQL scripts as separate steps gives users the option to separate their SQL code from their python code. This way, it's much clearer what each step does, e.g. SQL steps always query a database.
Example
10_backup_data.sql
CREATE TABLE Products_Backup (
id int,
ProductID int,
ProductName varchar(255),
CategoryName varchar(255),
Count
);
CREATE TABLE Users_Backup (
id int,
Name varchar(255),
Location varchar(255),
Email varchar(255),
);
INSERT INTO Users_Backup SELECT * FROM Users;
INSERT INTO Products_Backup SELECT * FROM Products;
20_python_processing.py
def run(job_input: IJobInput):
log.info("Doing some fancy stuff to the original tables")
# Do the fancy stuff
30_clean_up.sql
DROP TABLE Products_Backup;
DROP TABLE Users_Backup;
Pro-Con Analysis
PRO: Running SQL becomes a full feature. We can have more complex SQL-only jobs, for example.
PRO: In mixed jobs, SQL that is not interpolated can be separated into it's own steps.
CON: Implementing this efficiently might not be trivial for some python-sql libraries. Some libraries might not support executing whole scripts in one go.
Use available database plugins for fetching data
We should have a way to specify the database driver to use for the specific SQL file in config.ini. In case of executing SQL from python, we should be able to specify the driver in the function call. This way, we can configure multiple databases as data sources and query all of them, provided that the correct plugin is installed.
Overview
Example SQL Steps
10_select_all_users.sql
SELECT * from Users;
20_select_all_products.sql
SELECT * from Products;
30_select_all_suppliers.sql
SELECT * from Suppliers;
config.ini
[vdk]
sql_step_drivers=["impala": ["10_select_all_users.sql", "30_select_all_suppliers.sql"], "oracle": ["20_select_all_products.sql"]]
Example SQL in Python Steps
10_run_queries_from_python.py
def run(job_input: IJobInput):
users = job_input.execute_query("SELECT * from Users", driver="impala")
products = job_input.execute_query("SELECT * from Products", driver="impala")
suppliers = job_input.execute_query("SELECT * from Suppliers", driver="oracle")
Pro-Con Analysis
PRO: Users don't have to write extra code for fetching data from non-default databases.
CON: Increased job complexity, e.g. we can have 10 SQL steps, each with a different data source.
Pass data between steps
Overview
If an SQL step ends with a SELECT query, then the result of this query should be available to other steps. If a python step returns a value, that value should be available to other steps. Python steps should be able to specify which data they need from which previous steps and then access it through a shared object, e.g. a data cache that's available in each step.
Example SQL to Python
10_run_sql.sql
CREATE TABLE Products (
id int NOT NULL AUTO_INCREMENT,
ProductID int,
ProductName varchar(255),
CategoryName varchar(255),
Count int DEFAULT 0
);
SELECT ProductID, ProductName, CategoryName
FROM Products
INNER JOIN Categories ON Products.CategoryID = Categories.CategoryID;
20_run_python.py
import logging
from vdk.api.job_input import IJobInput
log = logging.getLogger(__name__)
@use_data(steps=[10_run_sql.sql])
def run(job_input: IJobInput, data_cache: IDataCache):
products = data_cache["10_run_sql.sql"]
for product in products:
prod_id = product["ProductID"]
response = requests.get(f'https://api.productcount.com/inv/{prod_id}')
inv_count = response.json().get("count", 0)
product["Count"] = inv_count
job_input.send_object_for_ingestion(payload=product, destination_table="Products", method="oracle")
Example Python to Python
10_process_data.py
def run(job_input: IJobInput):
data = []
response = requests.get(f'https://api.someapi.com/users/active')
data = response.json()
data = process_data(data)
return data
20_some_queries.sq
-- Do something in sql between the two python steps
Select * from Products;
30_more_processing.py
@use_data(steps=["10_process_data.py"])
def run(job_input: IJobInput, data_cache: IDataCache):
data = data_cache["10_process_data.py"]
con = sqlite3.connect("company.db")
cur = con.cursor()
qres = cur.execute("SELECT * FROM Users where Status='ACTIVE'")
sup_data = qres.fetchall()
data = cross_reference(data, sup_data)
# Send for ingestion
return data
40_something_else.py
@use_data(steps=["20_some_queries.sql", "30_more_processing.py"])
def run(job_input: IJobInput, data_cache: IDataCache):
data = data_cache["20_some_queries.sql.py"]
other_data = data_cache["30_more_processing.py"]
Pro-Con Analysis
PRO: Users can front-load expensive operations.
PRO: Different types of data processing can be decoupled.
PRO: Easier to understand where data is coming from and how it flows through the system.
PRO: We could go one step further and order the job as a dependency graph. This way, steps that don't depend on each other can run in parallel.
CON: Needs pre-processing of data job files to figure out which data to cache and which to discard.
CON: Memory constraints.