package cn.com.larunda;
import lombok.Data;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
/**
* 描述.
*
* @author yx
* @version 1.0
* @since 19-11-21下午1:29
*/
public class FlinkTableSQL {
public static void main(String[] args) throws Exception {
/* EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
*/
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);
DataStream<WordCount> myDataSource = fsEnv.addSource(new SourceFunction<WordCount>() {
@Override
public void run(SourceContext<WordCount> sourceContext) throws Exception {
while (true) {
Thread.sleep(1000);
sourceContext.collect(new WordCount("Hello", 1));
}
}
@Override
public void cancel() {
// nothing
}
});
fsTableEnv.registerDataStream("WordCount", myDataSource);
// run a SQL query on the Table and retrieve the result as a new Table
Table table = fsTableEnv.sqlQuery(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
// 使用flinkSQL处理实时数据当我们把表转化成流的时候,需要使用toAppendStream与toRetractStream这两个方法。
// 稍不注意可能直接选择了toAppendStream。
//追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。
// 如果更新或删除操作使用追加模式会失败报错
//缩进模式:始终可以使用此模式。返回值是boolean类型。
// 它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回
// 按照官网的理解如果数据只是不断添加,可以使用追加模式,
// 其余方式则不可以使用追加模式,而缩进模式侧可以适用于更新,删除等场景
DataStream<Tuple2<Boolean, WordCount>> stream = fsTableEnv.toRetractStream(table, WordCount.class);
stream.print();
fsTableEnv.execute("flink stream table job");
}
@Data
public static class WordCount{
public String word;
public long frequency;
public WordCount(){}
public WordCount(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "词语:" + word + ",词频:" + frequency;
}
}
}