前言
最近公司要搞搜索,需要把mysql中的数据同步到es中来进行搜索,由于公司已经搭建了flink集群,就打算用flink来做这个同步。本来以为很简单,跟着官网文档走就好了,结果没想到折腾了将近一周的时间……
我也是没想到,这玩意网上资源竟然这么少,找到的全部都是通过flink sql-client实现的,但这有个问题,当fink集群重启,JOB就没有了,没有办法通过savePointing来恢复。所以还是记录下。文章来源:https://www.toymoban.com/news/detail-516097.html
代码
直接上代码:文章来源地址https://www.toymoban.com/news/detail-516097.html
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink/savepointings");
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE orders (\n" +
" order_id INT,\n" +
" order_date TIMESTAMP(0),\n" +
" customer_name STRING,\n" +
" price DECIMAL(10, 5),\n" +
" product_id INT,\n" +
" order_status BOOLEAN,\n" +
" PRIMARY KEY (order_id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'mydb',\n" +
" 'table-name' = 'orders'\n" +
" );").await();
tableEnv.executeSql("CREATE TABLE products (\n" +
" id INT,\n" +
" name STRING,\n" +
" description STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'mydb',\n" +
" 'table-name' = 'products'\n" +
" );").await();
tableEnv.executeSql("CREATE TABLE enriched_orders (\n" +
" order_id INT,\n" +
" order_date TIMESTAMP(0),\n" +
" customer_name STRING,\n" +
" price DECIMAL(10, 5),\n" +
" product_id INT,\n" +
" order_status BOOLEAN,\n" +
" product_name STRING,\n" +
" product_description STRING,\n" +
" PRIMARY KEY (order_id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'http://localhost:9200',\n" +
" 'index' = 'enriched_orders_lhc'\n" +
" );");
tableEnv.executeSql("INSERT INTO enriched_orders\n" +
" SELECT o.*, p.name, p.description\n" +
" FROM orders AS o\n" +
" LEFT JOIN products AS p ON o.product_id = p.id");
env.execute("Mysql to ES");
}
到了这里,关于使用Flink CDC将Mysql中的数据实时同步到ES的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!