FlinkWindow - yxbook/notes GitHub Wiki

package cn.com.larunda;

import cn.com.larunda.dto.OriginalInput;
import cn.com.larunda.utils.KafkaUtils;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;

/**
 * 描述.
 *
 * @author yx
 * @version 1.0
 * @since 19-11-21下午1:29
 */
public class FlinkTableSQL {

    public static void main(String[] args) throws Exception {
     /*   EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
*/
        StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);

        FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("stream-calc-test", new SimpleStringSchema(),
                KafkaUtils.buildConsumerProperties("127.0.0.1:9092"));
        //setStartFromLatest()最新记录开始
        kafkaConsumer.setStartFromLatest();
        DataStream<String> dataStream = fsEnv.addSource(kafkaConsumer);
        //转换数据格式
        DataStream<OriginalInput> transformStream = dataStream.map((MapFunction<String, OriginalInput>) s ->
                JSONObject.parseObject(s, OriginalInput.class));

        fsTableEnv.registerDataStream("feed_input_test", transformStream);

        // run a SQL query on the Table and retrieve the result as a new Table
        Table table1 = fsTableEnv.sqlQuery("SELECT * from feed_input_test where inputId = 'cb5f487dd644442db8c4ec528a3b65f8'");
        Table table2 = fsTableEnv.sqlQuery("SELECT * from feed_input_test where inputId = 'cb5f487dd644442db8c4ec528a3b65f9'");

        // 使用flinkSQL处理实时数据当我们把表转化成流的时候,需要使用toAppendStream与toRetractStream这两个方法。
        // 稍不注意可能直接选择了toAppendStream。
        //追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。
        //           如果更新或删除操作使用追加模式会失败报错
        //缩进模式:始终可以使用此模式。返回值是boolean类型。
        //           它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回
        // 按照官网的理解如果数据只是不断添加,可以使用追加模式,
        // 其余方式则不可以使用追加模式,而缩进模式侧可以适用于更新,删除等场景
        DataStream<Tuple2<Boolean, OriginalInput>> stream1 = fsTableEnv.toRetractStream(table1, OriginalInput.class);

        DataStream<Tuple2<Boolean, OriginalInput>> stream2 = fsTableEnv.toRetractStream(table2, OriginalInput.class);

        stream1.coGroup(stream2)
                .where(new KeySelector<Tuple2<Boolean, OriginalInput>, Object>() {
                    @Override
                    public Object getKey(Tuple2<Boolean, OriginalInput> value) throws Exception {
                        System.out.println("where-value.f0" + value.f0);
                        return value.f0;
                    }
                })
                .equalTo(new KeySelector<Tuple2<Boolean, OriginalInput>, Object>() {
                    @Override
                    public Object getKey(Tuple2<Boolean, OriginalInput> value) throws Exception {
                        System.out.println("equalTo-value.f0" + value.f0);
                        return value.f0;
                    }
                }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
                .trigger(CountTrigger.of(1))
                .apply(new CoGroupFunction<Tuple2<Boolean,OriginalInput>, Tuple2<Boolean,OriginalInput>, Object>() {

                    @Override
                    public void coGroup(Iterable<Tuple2<Boolean, OriginalInput>> iterable, Iterable<Tuple2<Boolean, OriginalInput>> iterable1, Collector<Object> collector) throws Exception {
                        StringBuffer buffer=new StringBuffer();
                        buffer.append("DataStream frist:\n");
                        for(Tuple2<Boolean,OriginalInput> value:iterable){
                            buffer.append(value.f0+"=>"+value.f1+"\n");
                        }
                        buffer.append("DataStream second:\n");
                        for(Tuple2<Boolean,OriginalInput> value:iterable1){
                            buffer.append(value.f0+"=>"+value.f1+"\n");
                        }
                        collector.collect(buffer.toString());
                    }
                }).print();


        fsTableEnv.execute("flink stream  table job");
    }


}
⚠️ **GitHub.com Fallback** ⚠️