1. 场景
mysql 数据-- flink sql -->es
1.1 版本
mysql | flink | elasticsearch |
---|---|---|
5.7.20-log | 13.5 | 7.12.0 |
2. 数据结构
2.1 mysql 数据结构
REATE DATABASE /*!32312 IF NOT EXISTS*/`wudldb` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;
USE `wudldb`;
/*Table structure for table `Flink_cdc` */
DROP TABLE IF EXISTS `Flink_cdc`;
CREATE TABLE `Flink_cdc` (
`id` bigint(64) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`age` int(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=69 DEFAULT CHARSET=utf8mb4;
insert into `Flink_cdc`(`id`,`name`,`age`,`birthday`,`ts`) values
(1,'黎琛',10,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(2,'邬健松',4,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(3,'任伯',16,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(4,'费彩',19,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(5,'赏栋生',19,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(6,'谯璧蓓',24,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(7,'家翔发',20,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(8,'公姬',17,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(9,'萧茗',13,'2022-09-05 18:51:43','2022-09-05 18:51:43'),
(10,'门英',1,'2022-09-05 18:51:43','2022-09-05 18:51:43');
2.2 ES 的结构
PUT /wudl_flink_es/
PUT /wudl_flink_es/_mapping
{
"properties": {
"id": {
"type": "integer",
"index": false
},
"name": {
"type": "text",
"index": true
},
"age": {
"type": "integer",
"index": false
},
"birthday": {
"type": "text",
"index": true
},
"ts": {
"type": "text"
}
}
}
2. flink sql 的 mysql 和es
官网可以下载包 flink-sql-connector-elasticsearch7_2.11-1.13.6.jar
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/
文章来源:https://www.toymoban.com/news/detail-620087.html
CREATE TABLE flink_es_sink (
id BIGINT,
name STRING,
age INT,
birthday STRING,
ts TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://192.168.1.162:9200',
'index' = 'wudl_flink_es'
);
-----------------------------------------------------------------------------------------------------------------
CREATE TABLE source_mysql3 (
id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.1.162:3306/wudldb?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'Flink_cdc',
'username' = 'root',
'password' = '123456'
);
-----------------------------------------------------------------------------------------------------------------
Flink SQL> insert into flink_es_sink select id ,name , age,DATE_FORMAT(birthday, 'yyyy-MM-dd HH:mm:ss') birthday , LOCALTIMESTAMP ts from source_mysql3;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: cecd0f5a3d76676cb53de39a48caefc9
4. 结果
文章来源地址https://www.toymoban.com/news/detail-620087.html
到了这里,关于flink 13.5 sink elasticsearch-7的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!