Guía de uso de Apache Spark SQL - wandent/mutual-wiki GitHub Wiki
[[TOC]]
Esta página proporciona una referencia y guía de uso para Apache Spark SQL y Delta Lake en Azure Databricks.
Spark SQL es un módulo de Spark para el procesamiento de datos estructurados.
A diferencia de la API básica de Spark RDD (resilient distributed dataset), Spark SQL proporciona a Spark más información sobre la estructura de los datos y el cálculo que se realiza. Internamente, Spark SQL usa esta información adicional para realizar optimizaciones adicionales.
Hay varias formas de interactuar con Spark SQL, incluidos el lenguaje SQL y la API de Dataset. Al calcular un resultado, se utiliza el mismo motor de ejecución, independientemente de la API / lenguaje que esté utilizando para expresar el cálculo. Esta unificación significa que los desarrolladores pueden alternar fácilmente entre diferentes APIs según lo que proporciona la forma más natural de expresar una transformación dada.
Un uso de Spark SQL es ejecutar consultas en lenguaje SQL. Al ejecutar consultas SQL desde otro lenguaje de programación dentro de un Notebook, los resultados se devolverán como un Dataset o Dataframe
También se puede interactuar con la interfaz SQL utilizando la línea de comandos o sobre JDBC / ODBC.
Un Dataset es una colección distribuida de datos. Un Dataset puede ser construido a partir de objetos de JVM y luego manipulado utilizando transformaciones funcionales (map, flatMap, filter, etc.).
La API de Dataset está disponible en Scala y Java . Python no tiene soporte para la API Dataset. Pero debido a la naturaleza dinámica de Python, muchos de los beneficios de la API ya están disponibles (es decir, puede acceder al campo de una fila por nombre de forma natural: row.columnName). El caso de R es similar.
Un DataFrame es un Dataset organizado en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional o un data frame en R o Python, pero con optimizaciones más completas.
Los Dataframe se pueden construir a partir de una amplia gama de fuentes, tales como: archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes.
La API de Dataframe está disponible en Scala, Java, Python , y R .
En el siguiente enlace de referencia se presenta una lista completa de construcciones de lenguaje de definición de datos (DDL) y lenguaje de manipulación de datos (DML) compatibles con Databricks.
El siguiente código indica como crear una base de datos, si el nombre de la base de datos ya existe, se lanza una excepción.
CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] db_name
[COMMENT comment_text]
[LOCATION path]
[WITH DBPROPERTIES (key=val, ...)]
IF NOT EXISTS
Si una base de datos con el mismo nombre ya existe no pasará nada.
LOCATION
Si la ruta especificada aún no existe en el sistema de archivos subyacente, este comando intenta crear un directorio con la ruta indicada.
WITH DBPROPERTIES
Especifica una propiedad llamada key para la base de datos y establece el valor para la propiedad respectiva como val. Si la key ya existe, el valor antiguo es sobrescrito con el valor de val.
El siguiente código indica como crear una tabla utilizando una fuente de datos.
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name1 col_type1 [COMMENT col_comment1], ...)]
USING data_source
[OPTIONS (key1=val1, key2=val2, ...)]
[PARTITIONED BY (col_name1, col_name2, ...)]
[CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS]
[LOCATION path]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
[AS select_statement]
IF NOT EXISTS
Esta cláusula evita que se cree una excepción si una tabla con el mismo nombre ya existe en la base de datos.
USING data_source
Cláusula utilizada para indicar el formato de archivo a utilizar por la tabla. Las opciones disponibles son: TEXT, CSV, JSON, JDBC, PARQUET, ORC, HIVE, DELTA y LIBSVM o el nombre de una clase de acuerdo a una implementación personalizada de org.apache.spark.sql.sources.DataSourceRegister
OPTIONS
Opciones de la tabla utilizadas para optimizar el comportamiento de dicha tabla.
Nota: Esta cláusula no es soportada por Delta Lake
PARTITIONED BY
Partición de la tabla creada de acuerdo a las columnas especificadas. Un directorio es creado por cada partición
CLUSTERD BY
Cada partición en la tabla creada será dividida en un número fijo de contenedores de acuerdo a las columnas especificadas. Esto se usa tipicamente con particiones para leer y barajar menos datos.
LOCATION
La tabla creada usa el directorio especificado para almacenar la data.
Particularmente, para Delta Lake en Databricks, cuando se especifica una LOCATION que ya contiene datos almacenados en Delta Lake, Delta Lake realiza lo siguiente:
- Si se especifica solo el nombre de la tabla y ubicación, por ejemplo:
CREATE TABLE events
USING DELTA
LOCATION 'mnt/delta/events'
La tabla en el metastore de Hive automáticamente infiere el esquema, particionamiento y propiedades de los datos existentes.
- Si se especifica cualquier configuración (esquema, particionamiento, o propiedad de la tabla), Delta Lake verifica que la especificación coincida exactamente con la configuración de los datos existentes. De no ser así Delta LAke lanzará una excepción que describe la discrepancia.
AS <select_statement>
Pobla la tabla con los datos de entrada desde la declaración SELECT. Esta no puede contener una lista de columnas.
Ejemplos:
CREATE TABLE boxes (width INT, length INT, height INT) USING CSV
CREATE TABLE boxes
(width INT, length INT, height INT)
USING PARQUET
OPTIONS ('compression'='snappy')
CREATE TABLE rectangles
USING PARQUET
PARTITIONED BY (width)
CLUSTERED BY (length) INTO 8 buckets
AS SELECT * FROM boxes
El siguiente código didefine una vista lógica de una o mas tablas o vistas
CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [db_name.]view_name
[(col_name1 [COMMENT col_comment1], ...)]
[COMMENT table_comment]
[TBLPROPERTIES (key1=val1, key2=val2, ...)]
AS select_statement
OR REPLACE
Si la vista no existe, la declaración CREATE OR REPLACE VIEW es equivalete a CREATE VIEW. Si la vista existe, CREATE OR REPLACE VIEW es equivalente a ALTER VIEW.
[GLOBAL] TEMPORARY
TEMPORARY omite persistir la definición de la vista en la Metastore subyacente, de existir. Si se especifica GLOBAL, las diferentes sesiones pueden acceder a la vista y mantenerla activa hasta que finalice su aplicación; de lo contrario las vistas temporales tienen un alcance de sesión y se eliminarán automáticamente si la sesión finaliza. Todas las vistas temporales globales están vinculadas a una base de datos temporal preservada del sistema global_temp. El nombre de la base de datos se conserva y por lo tanto los usuarios no pueden crear / usar / descartar esta base de datos.
(col_name1 [COMMENT col_comment1], ...)
Una lista de columnas que definen el esquema de la vista. Los nombres de las columnas deben ser únicos con el mismo numero de columnas recuperados por SELECT_STATEMENT. Cuando la lista de columnas no es indicada, el esquema de la vista es el esquema de de salida de la consulta SELECT_STATEMENT.
TBLPROPERTIES
Pares key-value de metadata
AS select_statement
La declaración SELECT que define una vista. La declaración puede seleccionar desde la base de tablas u otras vistas.
Nota: No es posible especificar opciones de fuente de datos, partición o clustering, dado que una vista no es materializada como una tabla.
Ejemplos:
-- Create a persistent view view_deptDetails in database1. The view definition is recorded in the underlying metastore
CREATE VIEW database1.view_deptDetails
AS SELECT * FROM company JOIN dept ON company.dept_id = dept.id;
-- Create or replace a local temporary view from a persistent view with an extra filter
CREATE OR REPLACE TEMPORARY VIEW temp_DeptSFO
AS SELECT * FROM database1.view_deptDetails WHERE loc = 'SFO';
-- Access the base tables through the temporary view
SELECT * FROM temp_DeptSFO;
-- Create a global temp view to share the data through different sessions
CREATE GLOBAL TEMP VIEW global_DeptSJC
AS SELECT * FROM database1.view_deptDetails WHERE loc = 'SJC';
-- Access the global temp views
SELECT * FROM global_temp.global_DeptSJC;
-- Drop the global temp view, temp view, and persistent view.
DROP VIEW global_temp.global_DeptSJC;
DROP VIEW temp_DeptSFO;
DROP VIEW database1.view_deptDetails;
def squared(s)
return s * s
spark.udf.register("squaredWithPython", squared)
Opcionalmente se puede definir el tipo de dato retornado. El tipo de dato retornado por defecto es StringType
from pyspark.sql.types import LongType
def squared_typed(s)
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
spark.range(1,20).createOrReplaceTempView("test")
%sql SELECT id, squaredWithPython(id) as id_squared from test
Spark SQL no garantiza el orden de evaluación en una subexpresión, esto quiere decir que los datos de entrada de un operador o función no son necesariamente evaluados de izquierda a derecha o en ningún otro orden fijo.
Por lo tanto, no es recomendable confiar en la evaluación de orden de las expresiones booleanas o en el orden de las cláusulas WHERE y HAVING, dado que estas expresiones o cláusulas pueden ser reordenadas durante la optimización o planificación de consultas.
Por otro lado, no hay garantía de que una validación de null ocurra en una consulta SQL antes de invocar una UDF en dicha consulta, por ejemplo:
spark.udf.register("strlen", lambda s_ len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1")
En esta cláusula WHERE no se garantiza que la UDF "strlen" sea invocada antes de filtrar lo valores no null
Para realizar una validación de valores null, se recomienda hacer una de las siguientes acciones:
- Construir la UDF con validación de null en si misma
- Utilizar expresión IF o CASE WHEN para realizar la validación de null e invocar a la UDF en una rama del flujo condicional
Ambas alternativas se muestran a continuación:
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Cuando se lee datos desde una fuente de datos basado en archivos, Apache Spark SQL enfrente dos casos de errores típicos. El primero es que los archivos pueden no ser legibles (por ejemplo, pueden estar inaccesibles, corruptos o perdidos). El segundo, incluso si los archivos son procesables, algunos registros pueden no ser analizables (por ejemplo, debido errores de sintaxis y discordancia con el esquema de datos).
Databricks provee una interfaz unificada para manejar registros erróneos y archivos sin interrumpir los Jobs de Spark. Es posible obtener los registros / archivos de excepción y los motivos de la excepción configurando l aopción de fuente de datos badRecordsPath. La opción badRecordsPath especifica la ruta de almacenamiento de los archivos de excepción para guardar información sobre registros erróneos, en el caso de CSV y JSON, y archivos erróneos para todas las fuentes integradas basadas en archivos (por ejemplo, Parquet).
Adicionalmente, cuando se leen archivos puden producirse errores transitorios, como la excepción de conexión a red, la excepción de IO, etc. Estos errores se ignoran y también se registras en badRecordsPath, y Spark continuará ejecutando las tareas.
La opción de fuente de datos badRecordsPath es compatible con Databricks Runtime 3.0 o superior.
%scala
val df = spark.read
.option("badRecordsPath", "/tmp/badRecordsPath")
.parquet("/input/parquetFile")
// Elimina el archivo de entrada
dbutils.fs.rm("/input/parquetFile")
df.show()
En el ejemplo anterior, dado que df.show() es incapaz de encontrar el archivo de entrada, Spark creará una archivo de excepción en formato JSON para registrar el error. Por ejemplo, "/tmp/badRecordsPath/20170724T101153/bad_files/xyz" es la ruta del archivo de excepción, donde "/tmp/badRecordsPath/20170724T101153" es el tiempo de creación del dataframe, "bad_file" es el tipo de excepción y .xyz es un archivo que contiene el registro JSON que describe la ruta del archivo erróneo y el mensaje de error/excepción.
Delta Lake, es una capa de almacenamiento Open Source, que brinda confiabilidad a los lagos de datos. Delta Lake proporciona transacciones ACID (atomicity, consistency, isolation, durability), maneja escalabilidad de metadata y unifica procesamiento de datos en lote (batch) y streaming.
Delta Lake se ejecuta sobre un lago de datos existente y es compatible con Apache Spark APIs
En el siguiente enlace de referencia se presenta una introducción al funcionamiento básico de Delta Lake con Databricks.
CONVERT TO DELTA ([db_name.]table_name|parquet.`path/to/table`) [NO STATISTICS] PARTITIONED BY (col_name1 col_type1, col_name2 col_type2, ...)]
Nota
- CONVERT TO DELTA parquet.path/to/table requiere Databricks Runtime versión 5.2 o superior
- CONVERT TO DELTA [db_name.]table_name requiere Databricks Runtime 6.1 o superior
El comando descrito lista todos los archivos en el directorio definido, crea una log que realiza seguimiento de estos archivos y automáticamente infiere el esquema de datos leyendo los pies de página.
El proceso de conversión recopila estadísticas para mejorar el desempeño de la consulta sobre la tabla Delta convertida.
NO STATISTICS
Omite recopilación de estadísticas durante el proceso de conversión, lo que permite terminar la conversión más rápido. Después de que la tabla es convertida a Delta, puede usar el comando OPTIMIZE ... ZORDER BY (descrito en sección siguiente) para reorganizar el almacenamiento de datos y generar estadísticas.
PARTITIONED BY
Particiona la tabla mediante las columnas especificadas. Es requerido si los datos están particionados. El proceso de conversión aborta y genera una excepción si la estructura del directorio no se ajusta a la especificación de la declaración PARTITIONED BY. Si no se indica la cláusula PARTITIONED BY, el comando asume que la tabla no esta particionada.
OPTIMIZE [db_name.]table_name [WHERE predicate] [ZORDER BY (col_name1, col_name2, ...)
Optimiza el almacenamiento de datos en DBFS. Opcionalmente optimiza un subconjunto de datos o distribuye los datos por columna. Si no se especifica la distribución, la optimización bin-packing es realizada.
Nota
- La optimización_bin-packing_ es idempotente, esto quiere decir que si es ejecutada dos veces sobre el mismo dataset, la segunda instancia no tiene efecto.
- Z-Ordering es no idempotente y reorganiza todos los datos que coinciden con el filtro dado. Por lo tanto se sugiere que se limite a datos nuevos, utilizando filtros de partición cuando sea posible.
WHERE
Optimiza un subconjunto de filas que se coinciden con la declaración de partición dada. Solo se admiten filtros que involucren atributos de clave de partición.
ZORDER BY
Coloca la información de la columna en el mismo conjunto de archivos. Co-ubicación es utilizada por los algoritmos de omisión de datos de Delta Lake para reducir dramáticamente la cantidad de datos que deben leerse. Es posible especificar varias columnas en ZORDER BY mediante una lista separada por coma, sin embargo la efectividad de la ubicación disminuye con cada columna adicional.
Ejemplos:
OPTIMIZE events
OPTIMIZE events WHERE date >= '2010-01-01'
OPTIMIZE events WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)
Si esperas que una columna sea comúnmente utilizada en consultas y si dicha columna tiene alta cardinalidad (esto es un número alto de valores distintos), se recomienda utilizar la cláusula Z-ORDER BY. Delta Lake automáticamente distribuye los datos en los archivos según los valores de las columnas y utiliza la información de diseño para omitir datos irrelevantes durante la consulta.
Es posible particionar una tabla Delta por una columna. La columna de partición de datos mas comúnmente utilizada es la fecha
Siga estas dos reglas generales para decidir que columna utilizar:
- Si la cardinalidad de una columna será muy alta, no utilice esa columna para particionamiento. Por ejemplo, si la comlumna escogida es userId y puede haber 1 millón de IDs distintos de usuarios, entonces esta columna sería una mala estrategia de particionamiento.
- La cantidad de datos en cada partición: Es recomendado particionar por una columna si se esperá que los datos en esa partición sea de al menos 1 GB
Si continuamente se escribe datos en la tablas Delta, con el tiempo se acumularán un gran número de archivos, especialmente si se agrega datos en pequeños lotes. Esto puede tener efectos adversos en la eficiencia de lectura de las tablas y puede también afectar el rendimiento del sistema de archivos. Idealmente un gran número de pequeños archivos deberían ser re-escritos en un número mas pequeño de archivos de mayor tamaños con cierta regularidad. Esto es conocido como compactación.
Se puede compactar una tabla utilizando el comando OPTIMIZE.
-
Guía de programación de Spark SQL, DataFrames y Datasets: https://spark.apache.org/docs/latest/sql-programming-guide.html.
-
Delta Lake: https://docs.databricks.com/delta/index.html.
-
Databricks SQL: https://docs.databricks.com/spark/latest/spark-sql/index.html