flink笔记 - 9dian/Index GitHub Wiki

开发环境

Build from source

ref: https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/

mvn clean install -DskipTests -Dfast

quick start

创建工程: mvn -e archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.11.1 -DinteractiveMode=false -DarchetypeCatalog=local -Dpackage=ai -Da tifactId=rtflow1 -DgroupId=ai

mvn -e  archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.11.1 -DinteractiveMode=false -DarchetypeCatalog=local  -Dpackage=ai -Da tifactId=rtflow1 -DgroupId=ai

增加Test类

基于quick start Project, 增加org.myorg.quickstart.Test类。

Java project dir: /home/ubuntu/dev/quickstart/

Java Class文件路径:/home/ubuntu/dev/quickstart/src/main/java/org/myorg/quickstart, org.myorg.quickstart.Test

package org.myorg.quickstart;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Test {

    public static void main(String[] args) throws Exception {
        final String hostname;
        final int port;
        try {
            ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
            hostname = params.get("hostname");
        } catch (Exception e) {
            System.err.println("USAGE: Please run 'Test --hostname <hostname> --port <port>'");
            return;
        }
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get data from **stream
        DataStreamSource<String> stream = env.socketTextStream(hostname, port);

        // aggregation
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Test: WordCount from SocketTextStream");
    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] tokens = s.toLowerCase().split("\\W+");

            for (String token: tokens) {
                if (token.length() > 0) {
                    collector.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

build quickstart project

mvn clean package -DskipTests

test Socket Server

nc -lk 127.0.0.1 9999

此处输入的内容作为flink应用的输入。

Flink运行

/home/ubuntu/dev/flink-1.12.2/build-target/bin/flink run -c org.myorg.quickstart.Test quickstart-0.1.jar --port 9999 --hostname 127.0.0.1

flink应用运行后,可以在flink UI查看相关信息。

另外在flink的log目录(/home/ubuntu/dev/flink-1.12.2/build-target/log)下的 .out 文件中查看运行信息。

build & deploy record

ref: https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/

old version: 1.10.1

wget https://github.com/apache/flink/archive/release-1.10.1.tar.gz
ls -lhtr
rm release-1.10.1.tar.gz
ls
ls -lhtr
tar xzf flink-release-1.10.1.tar.gz
ls
cd flink-release-1.10.1
ls
~/apache-maven-3.3.9/bin/mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.12.1
screen -ls
screen -r dev
cd /tmp
wget https://repo.hortonworks.com/content/repositories/jetty-hadoop/com/101tec/zkclient/0.11/zkclient-0.11.jar
wget https://repo1.maven.org/maven2/com/101tec/zkclient/0.11/zkclient-0.11.jar
cd
cd flink-release-1.10.1
ls
find . -name pom.xml -exec grep 'zkclient' {} \; -print
vim ./flink-connectors/flink-connector-kafka-base/pom.xml
find . -name pom.xml -exec grep '0.11' {} \; -print
pwd
cd /tmp
ls
~/apache-maven-3.3.9/bin/mvn install:install-file -DgroupId=com.101tec -DartifactId=zkclient -Dversion=0.11 -Dfile=zkclient-0.11.jar -Dpackaging=jar
cd /tmp
wget https://repo.hortonworks.com/content/repositories/jetty-hadoop/com/github/jnr/jnr-posix/3.0.35/jnr-posix-3.0.35.jar
wget https://copernicus.serco.eu/repository/nexus/content/repositories/gael/com/github/jnr/jnr-posix/3.0.35/jnr-posix-3.0.35.jar
~/apache-maven-3.3.9/bin/mvn install:install-file -DgroupId=com.github.jnr -DartifactId=jnr-posix -Dversion=3.0.35 -Dfile=jnr-posix-3.0.35.jar -Dpackaging=jar
cd
du -sch *
cd flink-
cd flink-release-1.10.1
ls
du -sch *
df -h
free -m
top
ls
cd flink-release-1.10.1
ls
cd flink-dist/
ls
cd target/
ls
ls -lhtr
cd flink-
cd flink-1.10.1-bin/

readlink -f flink-1.10.1.tar.bz2
screen -d -r dev
cd flink-release-1.10.1
ls
cp flink-connectors/flink-hadoop-compatibility/target/flink-hadoop-compatibility_2.11-1.10.1.jar /tmp
cp flink-connectors/flink-connector-hive/target/flink-connector-hive_2.11-1.10.1.jar /tmp
cp flink-formats/flink-orc/target/flink-orc_2.11-1.10.1.jar /tmp
find . -name "flink-shaded-hadoop-2-uber*.jar"
cp ./flink-yarn-tests/target/shaded-hadoop/flink-shaded-hadoop-2-uber-2.6.0-cdh5.12.1-9.0.jar /tmp
cd /tmp
ls -lhtr
ls
find flink-release-1.10.1 -name "*link-shaded-hadoop-2-uber-2.6.0*.jar"
cd flink-release-1.10.1
ls
find . -name flink-shaded-hadoop-2-uber-2.6.0-cdh5.12.1-9.0.jar
cd flink-yarn-tests
ls
vim pom.xml
find . -name pom.xml -exec grep 'commons-cli' {} \; -print
find . -name pom.xml -exec grep 'commons' {} \; -print
ls

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.11</artifactId>
	<version>${flink.version}</version>
</dependency>

Sample Code


val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)

val tableEnvSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(20))
//  注册HiveCatalog
val catalogName = "my_catalog"
val catalog = new HiveCatalog(
  catalogName,              // catalog name
  "default",                // default database
  "/Users/lmagic/develop",  // Hive config (hive-site.xml) directory
  "1.1.0"                   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)
// 创建Kafka流表
// Kafka topic中存储的是JSON格式的埋点日志,建表时用计算列生成事件时间与水印。1.11版本SQL Kafka Connector的参数相比1.10版本有一定简化。

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.analytics_access_log_kafka")

tableEnv.executeSql(
  """
    |CREATE TABLE stream_tmp.analytics_access_log_kafka (
    |  ts BIGINT,
    |  userId BIGINT,
    |  eventType STRING,
    |  fromType STRING,
    |  columnType STRING,
    |  siteId BIGINT,
    |  grouponId BIGINT,
    |  partnerId BIGINT,
    |  merchandiseId BIGINT,
    |  procTime AS PROCTIME(),
    |  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')),
    |  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND
    |) WITH (
    |  'connector' = 'kafka',
    |  'topic' = 'ods_analytics_access_log',
    |  'properties.bootstrap.servers' = 'kafka110:9092,kafka111:9092,kafka112:9092'
    |  'properties.group.id' = 'flink_hive_integration_exp_1',
    |  'scan.startup.mode' = 'latest-offset',
    |  'format' = 'json',
    |  'json.fail-on-missing-field' = 'false',
    |  'json.ignore-parse-errors' = 'true'
    |)
  """.stripMargin
)
// 前面已经注册了HiveCatalog,故在Hive中可以观察到创建的Kafka流表的元数据(注意该表并没有事实上的列)。


hive> DESCRIBE FORMATTED stream_tmp.analytics_access_log_kafka;
OK
# col_name              data_type               comment


# Detailed Table Information
Database:               stream_tmp
Owner:                  null
CreateTime:             Wed Jul 15 18:25:09 CST 2020
LastAccessTime:         UNKNOWN
Protect Mode:           None
Retention:              0
Location:               hdfs://sht-bdmq-cls/user/hive/warehouse/stream_tmp.db/analytics_access_log_kafka
Table Type:             MANAGED_TABLE
Table Parameters:
    flink.connector         kafka
    flink.format            json
    flink.json.fail-on-missing-field    false
    flink.json.ignore-parse-errors  true
    flink.properties.bootstrap.servers  kafka110:9092,kafka111:9092,kafka112:9092
    flink.properties.group.id   flink_hive_integration_exp_1
    flink.scan.startup.mode latest-offset
    flink.schema.0.data-type    BIGINT
    flink.schema.0.name     ts
    flink.schema.1.data-type    BIGINT
    flink.schema.1.name     userId
    flink.schema.10.data-type   TIMESTAMP(3)
    flink.schema.10.expr    TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, 'yyyy-MM-dd HH:mm:ss'))
    flink.schema.10.name    eventTime
    flink.schema.2.data-type    VARCHAR(2147483647)
    flink.schema.2.name     eventType
    # 略......
    flink.schema.9.data-type    TIMESTAMP(3) NOT NULL
    flink.schema.9.expr     PROCTIME()
    flink.schema.9.name     procTime
    flink.schema.watermark.0.rowtime    eventTime
    flink.schema.watermark.0.strategy.data-type TIMESTAMP(3)
    flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '15' SECOND
    flink.topic             ods_analytics_access_log
    is_generic              true
    transient_lastDdlTime   1594808709

# Storage Information
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat:            org.apache.hadoop.mapred.TextInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
Compressed:             No
Num Buckets:            -1
Bucket Columns:         []
Sort Columns:           []
Storage Desc Params:
    serialization.format    1
Time taken: 1.797 seconds, Fetched: 61 row(s)
创建Hive表
Flink SQL提供了兼容HiveQL风格的DDL,指定SqlDialect.HIVE即可(DML兼容还在开发中)。

为了方便观察结果,以下的表采用了天/小时/分钟的三级分区,实际应用中可以不用这样细的粒度(10分钟甚至1小时的分区可能更合适)。

tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS hive_tmp")
tableEnv.executeSql("DROP TABLE IF EXISTS hive_tmp.analytics_access_log_hive")

tableEnv.executeSql(
  """
    |CREATE TABLE hive_tmp.analytics_access_log_hive (
    |  ts BIGINT,
    |  user_id BIGINT,
    |  event_type STRING,
    |  from_type STRING,
    |  column_type STRING,
    |  site_id BIGINT,
    |  groupon_id BIGINT,
    |  partner_id BIGINT,
    |  merchandise_id BIGINT
    |) PARTITIONED BY (
    |  ts_date STRING,
    |  ts_hour STRING,
    |  ts_minute STRING
    |) STORED AS PARQUET
    |TBLPROPERTIES (
    |  'sink.partition-commit.trigger' = 'partition-time',
    |  'sink.partition-commit.delay' = '1 min',
    |  'sink.partition-commit.policy.kind' = 'metastore,success-file',
    |  'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00'
    |)
  """.stripMargin
)
Hive表的参数复用了SQL FileSystem Connector的相关参数,与分区提交(partition commit)密切相关。仅就上面出现的4个参数简单解释一下。

sink.partition-commit.trigger:触发分区提交的时间特征。默认为processing-time,即处理时间,很显然在有延迟的情况下,可能会造成数据分区错乱。所以这里使用partition-time,即按照分区时间戳(即分区内数据对应的事件时间)来提交。
partition.time-extractor.timestamp-pattern:分区时间戳的抽取格式。需要写成yyyy-MM-dd HH:mm:ss的形式,并用Hive表中相应的分区字段做占位符替换。显然,Hive表的分区字段值来自流表中定义好的事件时间,后面会看到。
sink.partition-commit.delay:触发分区提交的延迟。在时间特征设为partition-time的情况下,当水印时间戳大于分区创建时间加上此延迟时,分区才会真正提交。此值最好与分区粒度相同,例如若Hive表按1小时分区,此参数可设为1 h,若按10分钟分区,可设为10 min。
sink.partition-commit.policy.kind:分区提交策略,可以理解为使分区对下游可见的附加操作。metastore表示更新Hive Metastore中的表元数据,success-file则表示在分区内创建_SUCCESS标记文件。
当然,SQL FileSystem Connector的功能并不限于此,还有很大自定义的空间(如可以自定义分区提交策略以合并小文件等)。具体可参见官方文档。

流式写入Hive
注意将流表中的事件时间转化为Hive的分区。

tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.executeSql(
  """
    |INSERT INTO hive_tmp.analytics_access_log_hive
    |SELECT
    |  ts,userId,eventType,fromType,columnType,siteId,grouponId,partnerId,merchandiseId,
    |  DATE_FORMAT(eventTime,'yyyy-MM-dd'),
    |  DATE_FORMAT(eventTime,'HH'),
    |  DATE_FORMAT(eventTime,'mm')
    |FROM stream_tmp.analytics_access_log_kafka
    |WHERE merchandiseId > 0
  """.stripMargin
)
来观察一下流式Sink的结果吧。


上文设定的checkpoint interval是20秒,可以看到,上图中的数据文件恰好是以20秒的间隔写入的。由于并行度为3,所以每次写入会生成3个文件。分区内所有数据写入完毕后,会同时生成_SUCCESS文件。如果是正在写入的分区,则会看到.inprogress文件。

通过Hive查询一下,确定数据的时间无误。

hive> SELECT from_unixtime(min(cast(ts / 1000 AS BIGINT))),from_unixtime(max(cast(ts / 1000 AS BIGINT)))
    > FROM hive_tmp.analytics_access_log_hive
    > WHERE ts_date = '2020-07-15' AND ts_hour = '23' AND ts_minute = '23';
OK
2020-07-15 23:23:00 2020-07-15 23:23:59
Time taken: 1.115 seconds, Fetched: 1 row(s)
流式读取Hive
要将Hive表作为流式Source,需要启用dynamic table options,并通过table hints来指定Hive数据流的参数。以下是简单地通过Hive计算商品PV的例子。

tableEnv.getConfig.getConfiguration.setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true)

val result = tableEnv.sqlQuery(
  """
     |SELECT merchandise_id,count(1) AS pv
     |FROM hive_tmp.analytics_access_log_hive
     |/*+ OPTIONS(
     |  'streaming-source.enable' = 'true',
     |  'streaming-source.monitor-interval' = '1 min',
     |  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
     |) */
     |WHERE event_type = 'shtOpenGoodsDetail'
     |AND ts_date >= '2020-07-15'
     |GROUP BY merchandise_id
     |ORDER BY pv DESC LIMIT 10
   """.stripMargin
)

"/*+ OPTIONS(
  'streaming-source.enable' = 'true',
  'streaming-source.monitor-interval' = '1 min',
  'streaming-source.consume-start-offset' = '2020-07-15 23:30:00'
) */"

result.toRetractStream[Row].print().setParallelism(1)
streamEnv.execute()
三个table hint参数的含义解释如下。

streaming-source.enable:设为true,表示该Hive表可以作为Source。
streaming-source.monitor-interval:感知Hive表新增数据的周期,以上设为1分钟。对于分区表而言,则是监控新分区的生成,以增量读取数据。
streaming-source.consume-start-offset:开始消费的时间戳,同样需要写成yyyy-MM-dd HH:mm:ss的形式。
更加具体的说明仍然可参见官方文档(吐槽一句,这份文档的Chinglish味道真的太重了=。=

最后,由于SQL语句中有ORDER BY和LIMIT逻辑,所以需要调用toRetractStream()方法转化为回撤流,即可输出结果。

The End
Flink 1.11的Hive Streaming功能大大提高了Hive数仓的实时性,对ETL作业非常有利,同时还能够满足流式持续查询的需求,具有一定的灵活性。

Exactly Once语义理解

分布式快照(Lightweight Asynchronous Snapshot for Distributed Dataflows, Chandy-Lamport algorithm)和两相位提交.

传统方法缺电:目前的方法依赖于周期性的全局快照,发生故障时的数据恢复。这些方法有两个主要缺点。首先,这些方法往往会使整体计算停滞 不前,影响数据的摄入。其次,这些方法都渴望保存所有运行状态变化的记录。这种行为会导致大量的超快照信息,而这些快照信息不是必需的。 快照技术可以让分布式计算节点的同步全部停止,从而获得一致的全局状态。此外,据我们所知,分布式算法的所有当前快照都在整个执行图中>由通道或消息处理,作为快照状态的一部分。通常,这些数据比必要的要大得多。

两相位提交 (Two phase submission) 1.4版本引入,两相位结合source和sink(尤其是kafka0.11+)使得处理语义exactly once成为可能。

flink two-stage commit的实现封装在抽象类org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction,我们只需要实现 beginTransaction、preCommit、commit、abort这四种方法就可以实现“精确一次”的处理语义.

  • beginTransaction: 在开始事务之前,我们在目标文件系统的临时目录下创建一个临时文件,然后在处理数据时将数据写入这个文件。
  • preCommit: 在预提交阶段,你可以刷新文件,关闭文件,然后你不能再次写入文件。我们还将为属于下一个检查点的任何后续写入启动一个新>事务。
  • commit: 在提交阶段,我们会将预先提交的文件原子性地移动到真正的目标目录,这会增加输出数据可见性的延迟. 如果失败了, flink会restart并调用recoverAndCommit。
  • abort: 在中止阶段,删除临时文件。
⚠️ **GitHub.com Fallback** ⚠️