Flink构造宽表实时入库案例介绍

这篇具有很好参考价值的文章主要介绍了Flink构造宽表实时入库案例介绍。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

查看可以成功入库

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB

显示已经成功入库

Flink构造宽表实时入库案例介绍,数据库,flink,大数据,mysql,sql,sequoiaDB文章来源地址https://www.toymoban.com/news/detail-812286.html

到了这里,关于Flink构造宽表实时入库案例介绍的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Flink CDC实时同步PG数据库

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git  1、更改配置文件postgresql.conf # 更改wal日志方式为logical wal_level = logical # minimal, replica, or logical # 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots max_replication_slots = 20 # m

    2024年02月13日
    浏览(69)
  • 使用flink实现《实时数据分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时数据分析的案例。该案例使用Flink的流处理功能,从Kafka主题中读取数据,进行实时处理和分析,并将结果输出到Elasticsearch中。 Java 8 Flink 1.13.2 Kafka 2.8.0 Elasticsearch 7.13.4 本案例使用Kafka作为数据源,从一个名为 user_behavior 的主题中读取数据。

    2024年02月08日
    浏览(40)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源

    目标 : 了解数据源的格式及实现模拟数据的生成 路径 step1:数据格式 step2:数据生成 实施 数据格式 消息时间 发件人昵称 发件人账号 发件人性别 发件人IP 发件人系统 发件人手机型号 发件人网络制式 发件人GPS 收件人昵称 收件人IP 收件人账号 收件人系统 收件人手机型号

    2024年02月04日
    浏览(43)
  • OceanBase X Flink 基于原生分布式数据库构建实时计算解决方案

    摘要:本文整理自 OceanBase 架构师周跃跃,在 Flink Forward Asia 2022 实时湖仓专场的分享。本篇内容主要分为四个部分: 分布式数据库 OceanBase 关键技术解读 生态对接以及典型应用场景 OceanBase X Flink 在游戏行业实践 未来展望 点击查看原文视频 演讲PPT 作为一款历经 12 年的纯自研

    2024年02月13日
    浏览(45)
  • 07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等

    7.第七章 Hudi案例实战 7.1 案例架构 7.2 业务数据 7.2.1 客户信息表 7.2.2 客户意向表 7.2.3 客户线索表 7.2.4 线索申诉表 7.2.5 客户访问咨询记录表 7.3 Flink CDC 实时数据采集 7.3.1 开启MySQL binlog 7.3.2 环境准备 7.3.3 实时采集数据 7.3.3.1 客户信息表 7.3.3.2 客户意向表 7.3.3.3 客户线索表 7

    2024年02月13日
    浏览(51)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(51)
  • 使用flink实现《实时监控和日志分析》的案例 java版

    本文档介绍了使用Java和Flink实现实时监控和日志分析的案例。该案例旨在通过实时监控和日志分析来提高系统的可靠性和性能。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kjPKQuIf-1686052913444)(./architecture.png)] 如上图所示,该系统由以下组件组成

    2024年02月06日
    浏览(43)
  • HStore表全了解:实时入库与高效查询利器

    摘要: 本文章将从使用者角度介绍HStore概念以及使用。 本文分享自华为云社区《GaussDB(DWS)HStore表讲解》,作者:大威天龙:- 。 面对实时入库和实时查询要求越来越高的趋势,已有的列存储无法支持并发更新入库,行存查询性能无法做到实时返回且空间压缩表现不佳。GaussD

    2024年02月08日
    浏览(35)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

    目标 : 实现FineBI访问MySQL结果数据集的配置 实施 安装FineBI 参考《FineBI Windows版本安装手册.docx》安装FineBI 配置连接 数据准备 小结 实现FineBI访问MySQL结果数据集的配置 目标 : 实现FineBI实时报表构建 路径 step1:实时报表构建 step2:实时报表配置 step3:实时刷新测试 实施 实

    2024年02月04日
    浏览(43)
  • 【Flink】 Flink实时读取mysql数据

    准备 你需要将这两个依赖添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 读取 kafka 数据 这里我依旧用的以前的 student 类,自己本地起了 kafka 然后造一些测试数据,这里我们测试发送一条数据则 sleep 10s,意味着往 kafka 中一分钟发 6 条数据。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    浏览(44)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包