目标 去实现一个简单的逻辑
mysql数据同步到hive,大致流程如下
分为离线和实时两部分
我们先实现离线
所需要以下内容
Flink,Seatunnel,Mysql,Hive,Hadoop,Java
2.离线Mysql到Hive数据同步
1)准备所需要的
2)开始
mysql创建数据库及其内容
-- 创建数据库
create database seatunnel;
-- 进入seatunnel数据库
use seatunnel;
-- 创建表
create table day_test(
dname varchar(64),
dage int
);
-- 插入数据
insert into day_test values('张三',20);
insert into day_test values('李四',18);
insert into day_test values('王二',29);
insert into day_test values('麻子',22);
我的数据库有数据,没什么太大问题,懒得删
2.hive创建接收数据表
#打开hive
hive
#新建数据库
create database mydemo;
#进入库
use mydemo;
#新建表
create table hive_mysql(
hname varchar(64),
hage int
);
#查看当前表的内容
select * from hive_mysql;
我的hive里有数据,这个没什么影响,不用管
ps1.错误一
hive的开启顺序是:先启动mysql,再启动hadoop集群,再启动hive
ps2.错误二
如果出现以下错误
这说明你的hive服务器没开,新开个页面,输入
hive --service metastore &
不关闭这个页面就行了,放后台,这时候就行了
3.修改Seatunnel配置文件
这是我的Seatunnel安装路径,你们换成自己的就行
#进入Seatunnel目录下的conf
cd /seatunnel/apache-seatunnel-incubating-2.3.0/conf
#复制配置文件并改名,变为我们后面的启动文件
cp /seatunnel/apache-seatunnel-incubating-2.3.0/conf/seatunnel.streaming.conf.template /seatunnel/apache-seatunnel-incubating-2.3.0/conf/example01.conf
#打开文件修改
vi example01.conf
#保存并退出
:wq
example01.conf文件内容如下
env {
execution.parallelism = 1
}
# 在source所属的块中配置数据源
source {
Jdbc {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seatunnel?serverTimezone=GMT%2b8&characterEncoding=utf-8"
user = "root"
password = "123456"
query = "select * from day_test"
}
}
# 在transform的块中声明转换插件
transform {
}
# 在sink块中声明要输出到哪
sink {
Hive {
table_name = "mydemo.hive_mysql"
metastore_uri = "thrift://127.0.0.1:9083"
schema {
fields {
hname = string
hage= int
}
}
}
}
4.去使用Flink提交同步作业
cd /seatunnel/apache-seatunnel-incubating-2.3.0
#用我们刚配置的文件去启动作业
./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf
提交任务,任务完成
这时打开Flink的web页面
这里就可以看见运行的结果,错误或者单纯查询可以显示在这,正确了没有显示,我这是之前的测试
这时候再去hive里查询字段
select * from hive_mysql;
内容如下
自此离线完成,反过来也能同步,hive-->mysql
实时CDC挖取日志同步到Hive
Mysql CDC配置打开
ps.
Mysql CDC内容读取成功,但是报运行时错误,没法同步到kafka
后续Kafka同步到Hive已经可以实现,解决了写上来
Kafka挖取日志
kafka放入sink里接收内容,配置如下
sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "127.0.0.1:9092"
partition = 3
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
}
}
Kafka同步数据到Hive
编写配置内容 kafka_hive.conf
#kafka要是json格式
env {
execution.parallelism = 1
}
# 在source所属的块中配置数据源
source {
Kafka {
result_table_name = "kafka_name"
schema = {
fields {
id = "int"
name = "string"
age = "int"
}
}
format = json
field_delimiter = "#"
topic = "mybate2"
bootstrap.servers = "127.0.0.1:9092"
kafka.max.poll.records = 500
kafka.client.id = 127.0.0.1
}
}
# 在transform的块中声明转换插件
transform {
}
# 在sink块中声明要输出到哪
sink {
Hive {
table_name = "mydemo.hive_mysql"
metastore_uri = "thrift://127.0.0.1:9083"
schema {
fields {
hid = int
hname = string
hage= int
}
}
}
}
设置编写启动脚本
vim kafka_stop.sh
内容如下
#!/bin/sh
#启动kafka挖取hive
./bin/start-seatunnel-flink-connector-v2.sh --config ./config/example01.conf
#等3秒后执行
sleep 3
#启动kafka同步数据到hive
./bin/start-seatunnel-flink-connector-v2.sh --config ./config/kafka_hive.conf
为脚本添加权限文章来源:https://www.toymoban.com/news/detail-544346.html
chmod +x cdc_hive.sh
提交Flink作业
sh cdc_hive.sh
这时打开hive,输入show tables;就可以看到内容的变更文章来源地址https://www.toymoban.com/news/detail-544346.html
到了这里,关于seatunnel示例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!