当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作文章来源:https://www.toymoban.com/news/detail-840347.html
package cn.edu.tju.demo2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;
public class Test41 {
//demo 是MySQL中已经创建好的表
//create table demo (userId varchar(50) not null,total bigint,avgVal double);
private static String FILE_PATH = "info.txt";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new FileSystem().path(FILE_PATH))
.withFormat(new Csv())
.withSchema(new Schema()
.field("userId", DataTypes.VARCHAR(50))
.field("ts", DataTypes.INT())
.field("val", DataTypes.DOUBLE()))
.createTemporaryTable("input");
Table dataTable = tableEnv.from("input");
Table aggregateTable = dataTable
.groupBy("userId")
.select("userId, userId.count as total, val.avg as avgVal");
String sql=
"create table jdbcOutputTable (" +
" userId varchar(50) not null,total bigint,avgVal double " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +
" 'connector.table' = 'demo', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = 123456' )";
tableEnv.sqlUpdate(sql);
aggregateTable.insertInto("jdbcOutputTable");
tableEnv.execute("my job");
}
}
文件info.txt文章来源地址https://www.toymoban.com/news/detail-840347.html
user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9
到了这里,关于flink:通过table api把文件中读取的数据写入MySQL的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!