Skip to content

使用文档

dmetasoul01 edited this page Jul 15, 2022 · 8 revisions

在你的 Spark 环境中引入依赖

LakeSoul 的发布版本和 Spark 版本相关,目前要求 Spark 3.1.2 和 Scala 2.12 版本。

使用 Spark Shell 快速测试,可以参考 快速开始

如果需要在 Spark Scala/Java 工程中测试,请在 Maven pom.xml 中添加依赖:

<dependency>
    <groupId>com.dmetasoul</groupId>
    <artifactId>lakesoul</artifactId>
    <version>2.0.1-spark-3.1.2</version>
</dependency>

使用文档

1. 创建和写入 LakeSoulTable

1.1 Table Name

LakeSoul 中表名是一个路径,数据存储的目录就是 LakeSoulTable 的表名。

当调用 Dataframe.write(writeStream) 方法向 LakeSoulTable 写数据时,若表不存在,则会使用存储路径自动创建新表。

1.2 元数据管理

LakeSoul 通过数据是管理 meta 数据,因此可以高效的处理元数据,并且 meta 集群可以很方便的在云上进行扩容。

1.3 Partition

LakeSoulTable 有两种分区方式,分别是 range 分区和 hash 分区,可以两种分区同时使用。

  • range 分区即通常的基于时间的表分区,不同分区的数据文件存储在不同的分区路径下;
  • 使用 hash 分区,必须同时指定 hash 分区主键字段和 hash bucket num,在写数据时,会根据 bucket num 对 hash 主键字段值进行散列,取模后相同数据会写到同一个文件,文件内部根据 hash 字段值升序排列;
  • 若同时指定了 range 分区和 hash 分区,则每个 range 分区内,hash 值相同的数据会写到同一个文件里;
  • 指定分区后,写入 LakeSoulTable 的数据必须包含分区字段。

可以根据具体场景选择使用 range 分区或 hash 分区,或者同时使用两者。当指定 hash 分区后,LakeSoulTable 的数据将根据主键唯一,主键字段为 hash 分区字段 + range 分区字段(如果存在)。

当指定 hash 分区时,LakeSoulTable 支持 upsert 操作,此时 append 模式写数据被禁止,可以使用 LakeSoulTable.upsert() 方法来代替。

1.4 代码示例

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._

val df = Seq(("2021-01-01",1,"rice"),("2021-01-01",2,"bread")).toDF("date","id","name")
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"

//create table
//spark batch
df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  .save(tablePath)
//spark streaming
import org.apache.spark.sql.streaming.Trigger
val readStream = spark.readStream.parquet("inputPath")
val writeStream = readStream.writeStream
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("1 minutes"))
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum", "2")
  .option("checkpointLocation", "s3a://bucket-name/checkpoint/path")
  .start(tablePath)
writeStream.awaitTermination()

//对于已存在的表,写数据时不需要再指定分区信息
//相当于 insert overwrite partition,如果不指定 replaceWhere,则会重写整张表
df.write
  .mode("overwrite")
  .format("lakesoul")
  .option("replaceWhere","date='2021-01-01'")
  .save(tablePath)

2. Read LakeSoulTable

可以通过 Spark read api 或者构建 LakeSoulTable 来读取数据,LakeSoul 也支持通过 Spark SQL 读取数据,详见 8. 使用 Spark SQL 操作 LakeSoulTable

2.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
val tablePath = "s3a://bucket-name/table/path/is/also/table/name"

//方法一
val df1 = spark.read.format("lakesoul").load(tablePath)

//方法二
val df2 = LakeSoulTable.forPath(tablePath).toDF

3. Upsert LakeSoulTable

3.1 Batch

当 LakeSoulTable 使用 hash 分区时,支持 upsert 功能。

默认情况下使用 MergeOnRead 模式,upsert 数据以 delta file 的形式写入表路径,LakeSoul 提供了高效的 upsert 和 merge scan 性能。

可以通过设置参数 spark.dmetasoul.lakesoul.deltaFile.enabledfalse 开启 CopyOnWrite 模式,每次 upsert 都生成最终合并数据,但不建议这么做,因为写效率很差且并发度较低。

3.1.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()
import spark.implicits._

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"

val lakeSoulTable = LakeSoulTable.forPath(tablePath)
val extraDF = Seq(("2021-01-01",3,"chicken")).toDF("date","id","name")

lakeSoulTable.upsert(extraDF)

3.2 Streaming 支持

流式场景中,若 outputMode 为 complete,则每次写数据都会 overwrite 之前的数据。

当 outputMode 为 append 或 update 时,如果指定了 hash 分区,则每次写入数据视为进行一次 upsert 更新,读取时如果存在相同主键的数据,同一字段的最新值会覆盖之前的值。仅当指定 hash 分区时,update outputMode 可用。
若未使用 hash 分区,则允许存在重复数据。

4. Update LakeSoulTable

LakeSoul 支持 update 操作,通过指定条件和需要更新的字段 expression 来执行。有多种方式可以执行 update,详见 LakeSoulTable 注释。

4.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
import org.apache.spark.sql.functions._

//update(condition, set)
lakeSoulTable.update(col("date") > "2021-01-01", Map("data" -> lit("2021-01-02")))

5. Delete Data

LakeSoul 支持 delete 操作删除符合条件的数据,条件可以是任意字段,若不指定条件,则会删除全表数据。

5.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)

//删除符合条件的数据
lakeSoulTable.delete("date='2021-01-01'")
//删除全表数据
lakeSoulTable.delete()

6. Compaction

执行 upsert 会生成 delta 文件,当 delta 文件过多时,会影响读取效率,此时可以执行 compaction 合并文件。

当执行全表 compaction 时,可以给 compaction 设置条件,只有符合条件的 range 分区才会执行 compaction 操作。

触发 compaction 的条件:

  1. range 分区最后一次修改时间在设置的 spark.dmetasoul.lakesoul.compaction.interval (ms) 之前,默认是 12 小时
  2. range 分区 upsert 产生的 delta file num 超过了设置的 spark.dmetasoul.lakesoul.deltaFile.max.num,默认是 5

6.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)

//对指定分区执行 compaction 操作
lakeSoulTable.compaction("date='2021-01-01'")
//对全表所有分区执行 compaction 操作
lakeSoulTable.compaction()
//对全表所有分区执行 compaction 操作,会检测是否符合执行 compaction 的条件,只有符合条件的才会执行
lakeSoulTable.compaction(false)

7. 使用 Spark SQL 操作 LakeSoulTable

LakeSoul 支持 Spark SQL 读写数据,使用时需要设置 spark.sql.catalog.spark_catalogorg.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
需要注意的是:

  • insert into 语句会默认开启 autoMerge 功能;
  • 建表语句中指定的分区为 range 分区,暂不支持通过 Spark SQL 在建表时设置 hash 分区;
  • 不能对 hash 分区的表执行 insert into 功能,请使用 lakeSoulTable.upsert() 方法;
  • lakeSoulTable 暂不支持部分 Spark SQL 语句,详见 org.apache.spark.sql.lakesoul.rules.LakeSoulUnsupportedOperationsCheck

7.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
spark.range(10).createOrReplaceTempView("tmpView")

//写数据
spark.sql(s"insert overwrite table lakesoul.`$tablePath` partition (date='2021-01-01') select id from tmpView") 
//insert into 不能对 hash 分区表使用,请使用 LakeSoulTableRel.upsert() 方法
spark.sql(s"insert into lakesoul.`$tablePath` select * from tmpView")

//读数据
spark.sql(s"select * from lakesoul.`$tablePath`").show()

8. Operator on Hash Primary Keys

指定 hash 分区后,LakeSoul 各 range 分区内的数据根据 hash 主键字段分片且分片数据有序,因此部分算子作用于 hash 主键字段时,无需 shuffle 和 sort。

LakeSoul 目前支持 join、intersect 和 except 算子的优化,后续将支持更多算子。

8.1 Join on Hash Keys

支持的场景:

  • 对于同一张表,不同分区的数据根据 hash 字段进行 join 时,无需 shuffle 和 sort
  • 若两张不同表的 hash 字段类型和字段数量相同,且 hash bucket 数量相同,它们之间根据 hash 字段进行 join 时,也无需 shuffle 和 sort

8.2 Intersect/Except on Hash Keys

支持的场景:

  • 对同一张表不同分区的 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct
  • 对两张不同的表,若它们拥有相同的 hash 字段类型和字段数量且 hash bucket 数量相同,对 hash 字段执行 intersect/except 时,无需 shuffle、sort 和 distinct

range 分区内,hash 主键字段值是唯一的,因此 intersect 或 except 的结果是不重复的,后续操作不需要再次去重,例如可以直接 count 获取不重复数据的数量,无需 count distinct

8.3 代码示例

import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoulsoul.catalog.LakeSoulCatalog")
  .getOrCreate()
import spark.implicits._


val df1 = Seq(("2021-01-01",1,1,"rice"),("2021-01-02",2,2,"bread")).toDF("date","id1","id2","name")
val df2 = Seq(("2021-01-01",1,1,2.7),("2021-01-02",2,2,1.3)).toDF("date","id3","id4","price")

val tablePath1 = "s3a://bucket-name/table/path/is/also/table/name/1"
val tablePath2 = "s3a://bucket-name/table/path/is/also/table/name/2"

df1.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id1,id2")
  .option("hashBucketNum","2")
  .save(tablePath1)
df2.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id3,id4")
  .option("hashBucketNum","2")
  .save(tablePath2)


//join on hash keys without shuffle and sort
//相同表的不同 range 分区
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath1` where date='2021-01-02') t2
    | on t1.id1=t2.id1 and t1.id2=t2.id2
  """.stripMargin)
    .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select t1.*,t2.* from
    | (select * from lakesoul.`$tablePath1` where date='2021-01-01') t1
    | join 
    | (select * from lakesoul.`$tablePath2` where date='2021-01-01') t2
    | on t1.id1=t2.id3 and t1.id2=t2.id4
  """.stripMargin)
  .show()

//intersect/except on hash keys without shuffle,sort and distinct
//相同表的不同 range 分区
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-02') t
  """.stripMargin)
  .show()
//相同 hash 设置的不同表
spark.sql(
  s"""
    |select count(1) from 
    | (select id1,id2 from lakesoul.`$tablePath1` where date='2021-01-01'
    |  intersect
    | select id3,id4 from lakesoul.`$tablePath2` where date='2021-01-01') t
  """.stripMargin)
  .show()

9. Schema 演进

LakeSoul 支持 schema 演进功能,可以新增列 (分区字段无法修改)。新增列后,读取现有数据,该新增列会是 NULL。你可以通过使用 upsert 功能,为现有数据追加该新列。

9.1 Merge Schema

在写数据时指定 mergeSchematrue,或者启用 autoMerge 来 merge schema,新的 schema 为表原本 schema 和当前写入数据 schema 的并集。

9.2 代码示例

df.write
  .mode("append")
  .format("lakesoul")
  .option("rangePartitions","date")
  .option("hashPartitions","id")
  .option("hashBucketNum","2")
  //方式一
  .option("mergeSchema","true")
  .save(tablePath)
  
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  //方式二
  .config("spark.dmetasoul.lakesoul.schema.autoMerge.enabled", "true")
  .getOrCreate()

10. Drop Partition

删除分区,也就是删除 range 分区,实际上并不会真正删掉数据文件,可以使用 cleanup 功能清理失效数据

10.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)

//删除指定 range 分区
lakeSoulTable.dropPartition("date='2021-01-01'")

11. Drop Table

删除表会直接删除表的所有 meta 数据和文件数据

11.1 代码示例

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql._
val spark = SparkSession.builder.master("local")
  .config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
  .getOrCreate()

val tablePath = "s3a://bucket-name/table/path/is/also/table/name"
val lakeSoulTable = LakeSoulTable.forPath(tablePath)

//删除表
lakeSoulTable.dropTable()
Clone this wiki locally