flink 13.5 sink elasticsearch-7

这篇具有很好参考价值的文章主要介绍了flink 13.5 sink elasticsearch-7。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 场景

mysql 数据-- flink sql -->es

1.1 版本

mysql flink elasticsearch
5.7.20-log 13.5 7.12.0

2. 数据结构

2.1 mysql 数据结构

REATE DATABASE /*!32312 IF NOT EXISTS*/`wudldb` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;

USE `wudldb`;

/*Table structure for table `Flink_cdc` */

DROP TABLE IF EXISTS `Flink_cdc`;

CREATE TABLE `Flink_cdc` (
  `id` bigint(64) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(20) DEFAULT NULL,
  `birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=69 DEFAULT CHARSET=utf8mb4;


insert  into `Flink_cdc`(`id`,`name`,`age`,`birthday`,`ts`) values 

(1,'黎琛',10,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(2,'邬健松',4,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(3,'任伯',16,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(4,'费彩',19,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(5,'赏栋生',19,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(6,'谯璧蓓',24,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(7,'家翔发',20,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(8,'公姬',17,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(9,'萧茗',13,'2022-09-05 18:51:43','2022-09-05 18:51:43'),

(10,'门英',1,'2022-09-05 18:51:43','2022-09-05 18:51:43');

2.2 ES 的结构

 PUT /wudl_flink_es/  
PUT /wudl_flink_es/_mapping
{ 
  "properties": {
    "id": {
      "type": "integer",
      "index": false
    },
    "name": {
      "type": "text",
	  "index": true
    },
    "age": {
      "type": "integer",
      "index": false
    },
    "birthday": {
      "type": "text",
      "index": true
    },
    "ts": {
      "type": "text"
     
    }
  }
}

2. flink sql 的 mysql 和es

官网可以下载包 flink-sql-connector-elasticsearch7_2.11-1.13.6.jar

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/elasticsearch/

CREATE TABLE flink_es_sink (
  id BIGINT,
  name STRING,
  age INT,
  birthday STRING,
  ts TIMESTAMP(3),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://192.168.1.162:9200',
  'index' = 'wudl_flink_es'
);

-----------------------------------------------------------------------------------------------------------------

CREATE TABLE source_mysql3 (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://192.168.1.162:3306/wudldb?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8',
'table-name' = 'Flink_cdc',
 'username' = 'root',
 'password' = '123456'
 );
-----------------------------------------------------------------------------------------------------------------
Flink SQL> insert into flink_es_sink  select id ,name , age,DATE_FORMAT(birthday, 'yyyy-MM-dd HH:mm:ss')    birthday , LOCALTIMESTAMP ts  from source_mysql3;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: cecd0f5a3d76676cb53de39a48caefc9

4. 结果

flink 13.5 sink elasticsearch-7,Flink,elasticsearch,flink,大数据文章来源地址https://www.toymoban.com/news/detail-620087.html

到了这里,关于flink 13.5 sink elasticsearch-7的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink Data Sink

    本专栏案例代码和数据集链接:   https://download.csdn.net/download/shangjg03/88477960 在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提

    2024年02月08日
    浏览(35)
  • Flink之JDBC Sink

    这里介绍一下Flink Sink中jdbc sink的使用方法,以 mysql 为例,这里代码分为两种,事务和非事务 非事务代码 事务代码 pom依赖 结果 jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

    2024年02月14日
    浏览(37)
  • Flink之Kafka Sink

    代码内容 结果数据

    2024年02月15日
    浏览(43)
  • 轻松通关Flink第34讲:Flink 和 Redis 整合以及 Redis Sink 实现

    上一课时我们使用了 3 种方法进行了 PV 和 UV 的计算,分别是全窗口内存统计、使用分组和过期数据剔除、使用 BitMap / 布隆过滤器。到此为止我们已经讲了从数据清洗到水印、窗口设计,PV 和 UV 的计算,接下来需要把结果写入不同的目标库供前端查询使用。 下面我们分别讲

    2024年02月08日
    浏览(43)
  • Flink创建Hudi的Sink动态表

    工厂类 HoodieTableFactory 提供的创建动态表接口 createDynamicTableSource 和 createDynamicTableSink,对应的源码文件为:https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 。 1、检查是否设置了 path 选项(checkArgument),没有的话抛异常“

    2024年02月07日
    浏览(36)
  • Flink Table/Sql自定义Kudu Sink实战(其它Sink可参考)

    使用第三方的org.apache.bahir » flink-connector-kudu,batch模式写入数据到Kudu会有FlushMode相关问题 具体可以参考我的这篇博客通过Flink SQL操作创建Kudu表,并读写Kudu表数据 Flink的Dynamic table能够统一处理batch和streaming 实现自定义Source或Sink有两种方式: 通过对已有的connector进行拓展。比

    2024年02月14日
    浏览(47)
  • Flink Table API/SQL 多分支sink

    在某个场景中,需要从Kafka中获取数据,经过转换处理后,需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错: 使用 StreamStatementSet. 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    浏览(100)
  • 12、Flink source和sink 的 clickhouse 详细示例

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月13日
    浏览(80)
  • Flink(五)source、transformations、sink的详细示例(一)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月15日
    浏览(30)
  • 5、Flink 的 source、transformations、sink的详细示例(一)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月14日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包