Flink消费Kafka案例 - TBDSUDC/tdbs-document GitHub Wiki

Flink消费TBDS-Kafka代码讲解

获取kafka的tbds的认证信息

1572242819010

SecretId:认证的Id值

SecretKey:认证的key值

用户:消费者

模块: 模块表示此id与key是用来访问kafka模块。

创建topic

---创建topic
export KAFKA_HOME=/opt/software/kafka_2.11-0.10.2.1/
cd $KAFKA_HOME
bin/kafka-topics.sh --create --zookeeper tbds-****:2181 --replication-factor 1 --partitions 2 --topic test01

引入pom文件(注意为了避免冲突,需要将kafka和flink的包的scope设置为provide)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demo.flink.opensource</groupId>
    <artifactId>flink-kafka-demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.5.0-oceanus-SNAPSHOT</flink.version>
        <tbds.svn.branch.version>5.1.2</tbds.svn.branch.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    </properties>
    <repositories>
        <repository>
            <id>data-respository</id>
            <name>data-repo</name>
            <url>https://tbdsrepo.cloud.tencent.com/repository/tbds/</url>
            <releases>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- test dependencies -->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>test</scope>
            <type>test-jar</type>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>

        <resources>
            <resource>
                <directory>lib</directory>
                <targetPath>BOOT-INF/lib/</targetPath>
                <includes>
                    <include>**/*.jar</include>
                </includes>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
                <targetPath>BOOT-INF/classes/</targetPath>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Flink代码讲解

/**
 * Kafka Source Demo
 */
public class KafkaSourceDemo {

    //测试的topic的名字
    private static  final String TEST_TOPIC="testflinktopic";
    //zookeeper的地址
    private static final String ZK_ADDRESS="*********:2181";
    //Kafka的地址
    private static final String KAFKA_ADDRESS="*********:6667";
    //认证信息的id和key
    private static final String  SECURRE_ID="*********";
    private static final String SECURRE_KEY="*********";


    public static void main(String[] args) throws Exception{

        //获取flink的环境信息
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //checkPoint的生成时间间隔
        env.enableCheckpointing(5000);
        //flink消费的数据窗口方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //配置kafka的参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",KAFKA_ADDRESS);
        properties.setProperty("zookeeper.connect",ZK_ADDRESS);
        properties.setProperty("group.id","test_group");
        properties.setProperty("auto.offset.reset","latest");
        properties.setProperty("enable.auto.commit","true");

        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //认证信息
        properties.put("security.protocol", "SASL_TBDS");
        properties.put("sasl.mechanism", "TBDS");
        properties.put("sasl.tbds.secure.id", SECURRE_ID);
        properties.put("sasl.tbds.secure.key", SECURRE_KEY);


        FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<String>(TEST_TOPIC,new SimpleStringSchema(),properties);
//        consumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
        DataStream<String> keyedStream = env.addSource(consumer);
        keyedStream.print();//直接将从生产者接收到的数据在控制台上进行打印
        keyedStream.map(new MapFunction<String, Void>() {
            @Override
            public Void map(String s) throws Exception {
                System.out.println(s+"======"+getTopicLIst());
                return null;
            }
        });
        // execute program
        env.execute("Flink-Kafka-demo");
    }

构建完成编译代码

1572243164545

编译好的包放在:

1572243201127

提交jar包

  1. 登录TBDS实时计算页面,然后点击“创建应用”

    1572243290857

  2. 填写实时任务相关的信息和运行资源信息

    1572243337864

  3. 选择jar上传

    15722433812464. 上传jar包和填写主类

    1572243432007

  4. 审批完成以后运行代码

    1572243479804

开发demo下载地址

https://github.com/TBDSUDC/TBDSDemo/tree/master/%E4%BD%BF%E7%94%A8TBDS%20maven%E5%BC%80%E5%8F%91%E6%A1%88%E4%BE%8B/flinkkafkademo

⚠️ **GitHub.com Fallback** ⚠️