服务后端通过ODBC连接Databricks(on Azure):以Django为例 - ziliantech-org/doc-zilian-wiki GitHub Wiki

By:李泽宇([email protected]), 欧阳图[email protected]

Databricks与湖仓一体

什么是数据仓库?什么是数据湖?参见[数据仓库和数据湖 - 知乎 (zhihu.com)](https://zhuanlan.zhihu.com/p/399110417)

“这里可以用一个做菜的场景做一个类比。以前数据仓库的时候,好比把原材料都加工好了,比如土豆清洗,去皮,切片,这样炒土豆片的时候直接炒就可以了。数据湖的时候呢,直接把土豆存储进来,这样以后想炒土豆片就切片,想炒土豆丝就切丝。增加了灵活性的同时,省去了前期头都处理的费用。”

名称 数据仓库 数据湖
读写模式 写时模式,数据存储前定义 Schema 读时模式,读取数据时定义 Schema
数据价值 提前明确 无须提前明确
存储数据类型 清洗后的结构化数据 原始数据、非结构化和半结构化数据
容量扩展成本 中等成本 低成本
支持功能 统计、报表和传统 BI分析 敏捷数据集成,支持编程框架
构建成本 重量级,时间成本高、投资规模大 轻量级,比较灵活,成本低

数据湖的核心理念如下

  • 存储海量的原始数据
  • 支持任意的数据格式(结构化、半结构化、非结构化)
  • 较强的分析和处理能力

湖仓一体的特性

(1)事务支持:在企业中,数据往往要为业务系统提供并发的读取和写入。对事务的ACID支持,可确保数据并发访问的一致性、正确性,尤其是在SQL的访问模式下。

(2)数据治理:湖仓一体可以支持各类数据模型的实现和转变,支持DW模式架构,例如星型模型、雪花模型等。该系统应当保证数据完整性,并且具有健全的治理和审计机制。

(3)BI支持:湖仓一体支持直接在源数据上使用BI工具,这样可以加快分析效率,降低数据延时。另外相比于在数据湖和数据仓库中分别操作两个副本的方式,更具成本优势。

(4)存算分离:存算分离的架构,也使得系统能够扩展到更大规模的并发能力和数据容量。

(5)开放性:采用开放、标准化的存储格式(例如Parquet等),提供丰富的API支持,因此,各种工具和引擎(包括机器学习和Python / R库)可以高效地对数据进行直接访问。

(6)支持多种数据类型(结构化、半结构化、非结构化):湖仓一体可为许多应用程序提供数据的入库、转换、分析和访问。数据类型包括图像、视频、音频、半结构化数据和文本等。

28.jpg

湖仓一体化的不同路径

  • 在数据仓库上支持数据湖 ,一般是通过在数仓中建外部表来实现,谈论的是数据仓库如何更加灵活,以数仓为核心,支持访问数据湖。以Snowflake为代表的,还包括阿里云的MaxCompute、亚马逊的Redshift;
  • 在数据湖中支持数仓能力 ,一般是通过功能性开发,比如多版本并发控制、自适应schema、提供文件级事务等等,来实现传统数仓的功能。以Databricks为代表。

配置Databricks

Databricks工作区非常整洁直观,根本没有提到Spark这个词,隐藏了所有的复杂性。提供了三大功能:

  1. BI功能:data science&engineering
  2. AI功能:machine learing
  3. SQL接口

img

我们关注的主要是SQL接口:

image

建立集群:

需要建立集群来进行SQL查询:

image

点击SQL Warahouses,再点击Create SQL Warahouse。

img

存在的问题:

1.启动集群真的很慢(大约4分钟)。Databricks至少在Azure中是一个平台即服务,当你创建新群集时,它将使用你自己的资源来预配新的VM(CPU,磁盘等),因此,启动一个新的群集需要这么长时间。

2.自动停止的最短持续时间为10分钟(在实践中,您应该将其增加到1小时,用户等待5分钟才能开始报告并不好玩)

3.当调整集群大小时,引擎会脱机,使用 Auto Scaling 可能是有意义的。

价格:

以Databricks Unit(DBU)1 DBU = 0.22 $/Hour和生成的资源支付,2X-Small需要2 CPU,成本为0.64 $ / H。

因此,测试群集的总计价格为 0.22 * 4 + 0.64 * 2 = 2.16 $/小时

集群运行环境参考:

建立好集群后,可以得到下列的连接参数

Cluster version:v 2022.27(current channel)
Cluster size:2X-Small
Scaling Cluster count: Active 1 Min 1 Max 1
Location:East US

image

安装ODBC、后端配置

官方文档:[Connect Python and pyodbc to Azure Databricks - Azure Databricks | Microsoft Docs](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/pyodbc)

注意:pyodbc的最新版本可能存在问题,推荐的版本为4.0.32([pyodbc 4.0.34 breaks connectivity with Dremio ODBC – Dremio Support](https://support.dremio.com/hc/en-us/articles/7524094752667-pyodbc-4-0-34-breaks-connectivity-with-Dremio-ODBC))

后端运行环境参考

Docker image:FROM python:3.9
OS:Debian GNU/Linux 11
Databricks OBDC Driver:2.6.25
Django:3.2.4
pyodbc:4.0.32
Location:Southeast Asia

配置后端一些可能遇到的问题:

1.pyodbc是否支持连接池?支持。[The pyodbc Module · mkleehammer/pyodbc Wiki (github.com)](https://github.com/mkleehammer/pyodbc/wiki/The-pyodbc-Module#pooling) 但是你需要设置ansi=True。[Connection pooling and ANSI · Issue #950 · mkleehammer/pyodbc (github.com)](https://github.com/mkleehammer/pyodbc/issues/950)

2.代码的best practice?使用with语句关闭connection和游标。

DATABRICKS_CONN_STR = (
    "Driver=/opt/simba/spark/lib/64/libsparkodbc_sb64.so;"
    + "HOST={host};"
    + "PORT=443;"
    + "Schema=default;"
    + "SparkServerType=3;"
    + "AuthMech=3;"
    + "UID=token;"
    + "PWD={DATABRICKS_PWD};"
    + "ThriftTransport=2;"
    + "SSL=1;"
    + "HTTPPath={httppath}"
)

with pyodbc.connect(DATABRICKS_CONN_STR, autocommit=True, ansi=True) as conn:
    with conn.cursor() as cursor:
    	cursor.execute(sql1, param_list1)
    	rows = cursor.fetchall()
    	columns = [col[0] for col in cursor.description]
 results = [dict(zip(columns, row)) for row in rows]
 # e.g. [{col1: value1, col2: value2, ...}, ...]

3.将查询结果集转化为dataframe?[Reading from databases with Python - Open Source Automation (theautomatic.net)](http://theautomatic.net/2020/03/12/reading-from-databases-with-python/)

import pandas as pd

with pyodbc.connect(DATABRICKS_CONN_STR, autocommit=True, ansi=True) as conn:
	data = pd.read_sql("SELECT TOP(1000) * FROM customers", conn)

5.对于django,没有适用数据源为databricks的engine、ORM库(可参考[pyodbc - How to use Django with Sql Server - Stack Overflow](https://stackoverflow.com/questions/32770942/how-to-use-django-with-sql-server)) ,来将查询结果集转化为对象。因此,为了进行聚合、筛选等操作,要么通过写SQL、要么写复杂的python字符串匹配语句。前者在传参时还需要考虑SQL注入的问题,这是在用django框架不需要考虑的问题。

6.如果你使用docker来打包后端服务,需要安装适用databricks的ODBC Driver,可参考如下代码:

RUN wget https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/odbc/2.6.25/SimbaSparkODBC-2.6.25.1043-Debian-64bit.zip
RUN unzip SimbaSparkODBC-2.6.25.1043-Debian-64bit.zip
RUN apt-get install -y libsasl2-modules-gssapi-mit
RUN dpkg -i ./simbaspark_2.6.25.1043-2_amd64.deb
RUN rm simbaspark_2.6.25.1043-2_amd64.deb
RUN apt-get install unixodbc unixodbc-dev -y

查询效率的对比实验

在azure databricks提供的sql editor里运行SQL和在后端通过ODBC连接databricks执行SQL进行效率的对比。

实验一:简单地查询全表

SELECT company_name FROM dev.company_info limit X
(azure databricks sql不支持分页查询。first和offset关键字不存在。只能查询前N条,limit N。)
sql editor sql ODBC
X=10 503ms 1.77s
X=1000 587ms 1.81s
X=10000 591ms 2.26s

注:sql editor对同一查询(SQL)有缓存,上表中的数据为第一次查询所需的时间。

实验二:含Where条件的筛选查询

SELECT company_name FROM dev.company_info Where region = '北京市' limit X
sql editor sql ODBC
X=10 445ms 1.27s
X=1000 488ms 1.72s
X=10000 741ms 2.35s

实验三:含聚合(group by + count)的复杂查询

SELECT company_name, count(*) FROM dev.company_info group by company_name limit X
sql editor sql ODBC
X=10 438ms 1.26s
X=1000 559ms 1.21s
X=10000 484ms 2.42s

实验四:多线程连接databricks:

结果是databricks支持并发连接,测试时未见排队现象。

image

评价:

  1. 使用ODBC连接Databricks还是有一定的延迟,可能是由于集群位置、后端位置差异造成的。
  2. Databricks对查询有缓存。
  3. 总体时间上可以接受。