Doris+Flink搭建数据平台
Doris部署
Doris 作为一款开源的 MPP 架构 OLAP 数据库,能够运行在绝大多数主流的商用服务器上。
安装:官网Doris安装
务必关注点:
1 设置系统最大打开文件句柄数
2 Linux 操作系统版本需求
3 软件需求(Java,GCC)
4 机器角色分配(下图画线部分是重点,预防脑裂!)
设计好前置环境,开始部署!
我的 Doris 安装过程
我的 版本:selectdb_doris-1.2.1.1-x86_64-avx2.tar.gz
安装工具:下文中的 toolkit
安装借鉴:安装doris集群文档
关注点:
1 首先,需要配置集群拓扑文件,提供部署新集群的参数信息。你可以执行如下命令,生成简易版的集群拓扑文件样例,然后按需调整:
dorisctrl cluster template > doris-topo.yaml
内容如下:
[root@node104 ~]# cat doris-topo.yaml
global:
user: root
ssh_port: 22
deploy_dir: /opt/selectdb
frontends:
- host: fe104
- host: fe174
- host: fe117
backends:
- host: be118
- host: be119
- host: be173
2 执行部署命令(这个命令空格啥的都很重要!下图是写错命令的执行后果)
cluster deploy edudoris(集群名) -v 1.2.1.1 -f doris-topo.yaml
3 查看指定集群的详细信息
dorisctrl cluster display edudoris
4 查看集群列表
dorisctrl cluster list
5 启动集群
dorisctrl cluster start edudoris
6 停止集群
dorisctrl cluster stop edudoris
7 查看集群详细信息
cat eduoris/meta.yaml
8 登录DorisWeb: fe104:8030/
9 登录客户端:官网借鉴
主要就是改改密码,建建用户账号、数据库表啥的
CREATE USER '用户名' IDENTIFIED BY '密码';
GRANT ALL ON 数据库111 TO 用户名;
toolkit
SelectDB Distribution Toolkit 是 SelectDB 提供的集群管理与数据开发工具,可以简单快捷的管理 Doris 集群,支持集群部署、查看、启停、扩容等常用操作。
我的版本是:selectdb_distribution_toolkit-1.2.1-x86_64.tar.gz
安装:官网toolkit安装
Flink部署
此处笔者仅利用flinkCDC功能测试,所以部署模式是standalone,后续高可用可以更换为FlinkOnYarn
安装借鉴:flink1.14.4安装
注意:
1 slot改成100 (CDC表一个表占用一个slot,同步表多了小心不够用!)
2 flink1:8081
3 提交任务(这2个sql文件是下文python脚本生成!!!)
-- doris建表
ssh -t -t $FEIP
ssh $FEIP " mysql -h$FEIP -uroot -p密码 -P9030 -e'source /root/sink2doris.sql;'"
-- 提交flinkJob 开始数据同步
cd /flink/bin/
./sql-client.sh -f /flinksync/output/mysql2sink.sql
离线数据同步脚本(Doris官方提供BUT需要修改!!!)
mysql to doris 主要适用于自动化创建doris odbc 表,主要用shell脚本实现。
官网借鉴:mysql_to_doris批量多表操作
官网给的此脚本内容自己下载mysql_to_doris.tar.gz解压即可查看,这里看看我改变的部分吧:
1 bin/run.sh 保持原样、勿动!
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
function usage() {
echo "Usage: run.sh [option]"
echo " -e, --create-external-table: create doris external table"
echo " -o, --create-olap-table: create doris olap table"
echo " -i, --insert-data: insert data into doris olap table from doris external table"
echo " -d, --drop-external-table: drop doris external table"
echo " -a, --auto-external-table: create doris external table and auto check mysql schema change"
echo " --database: specify the database name to process all tables under the entire database, and separate multiple databases with \",\""
echo " -t, --type: specify external table type, valid options: ODBC(default), JDBC"
echo " -h, --help: show usage"
exit 1
}
cur_dir=$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
if [[ $# -eq 0 ]]; then
usage
fi
opts=$(getopt -o eaoidht: \
-l create-external-table \
-l create-olap-table \
-l insert-datadrop-external-table \
-l auto-external-table \
-l database: \
-l type: \
-l help \
-n "$0" \
-- "$@")
eval set -- "${opts}"
CREATE_EXTERNAL_TABLE=0
CREATE_OLAP_TABLE=0
INSERT_DATA=0
DROP_EXTERNAL_TABLE=0
AUTO_EXTERNAL_TABLE=0
DATABASE=''
TYPE='ODBC'
while true; do
case "$1" in
-e | --create-external-table)
CREATE_EXTERNAL_TABLE=1
shift
;;
-o | --create-olap-table)
CREATE_OLAP_TABLE=1
shift
;;
-i | --insert-data)
INSERT_DATA=1
shift
;;
-d | --drop-external-table)
DROP_EXTERNAL_TABLE=1
shift
;;
-a | --auto-external-table)
AUTO_EXTERNAL_TABLE=1
shift
;;
--database)
DATABASE="$2"
shift 2
;;
-t | --type)
TYPE="$2"
shift 2
;;
-h | --help)
usage
shift
;;
--)
shift
break
;;
*)
echo "Internal error"
exit 1
;;
esac
done
home_dir=$(cd "${cur_dir}"/.. && pwd)
source "${home_dir}"/conf/env.conf
# when fe_password is not set or is empty, do not put -p option
use_passwd=$([ -z "${doris_password}" ] && echo "" || echo "-p${doris_password}")
if [ -n "${DATABASE}" ]; then
sh "${home_dir}"/lib/get_tables.sh "${DATABASE}"
fi
# create doris jdbc catalog
if [[ "JDBC" == "${TYPE}" && "${CREATE_EXTERNAL_TABLE}" -eq 1 ]]; then
echo "====================== start create doris jdbc catalog ======================"
sh "${home_dir}"/lib/jdbc/create_jdbc_catalog.sh "${home_dir}"/result/mysql/jdbc_catalog.sql 2>>error.log
echo "source ${home_dir}/result/mysql/jdbc_catalog.sql;" | mysql -h"${fe_master_host}" -P"${fe_master_port}" -u"${doris_username}" "${use_passwd}" 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== create doris jdbc catalog failed ======================"
exit "${res}"
fi
echo "====================== create doris jdbc catalog finished ======================"
fi
# create doris external table
if [[ "ODBC" == "${TYPE}" && "${CREATE_EXTERNAL_TABLE}" -eq 1 ]]; then
echo "====================== start create doris external table ======================"
sh "${home_dir}"/lib/e_mysql_to_doris.sh "${home_dir}"/result/mysql/e_mysql_to_doris.sql 2>error.log
echo "source ${home_dir}/result/mysql/e_mysql_to_doris.sql;" | mysql -h"${fe_master_host}" -P"${fe_master_port}" -u"${doris_username}" "${use_passwd}" 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== create doris external table failed ======================"
exit "${res}"
fi
echo "====================== create doris external table finished ======================"
fi
# create doris olap table
if [[ "${CREATE_OLAP_TABLE}" -eq 1 ]]; then
echo "====================== start create doris olap table ======================"
sh "${home_dir}"/lib/mysql_to_doris.sh "${home_dir}"/result/mysql/mysql_to_doris.sql 2>>error.log
echo "source ${home_dir}/result/mysql/mysql_to_doris.sql;" | mysql -h"${fe_master_host}" -P"${fe_master_port}" -u"${doris_username}" "${use_passwd}" 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== create doris olap table failed ======================"
exit "${res}"
fi
echo "====================== create doris olap table finished ======================"
fi
# insert data into doris olap table
if [[ "${INSERT_DATA}" -eq 1 ]]; then
echo "====================== start insert data ======================"
if [[ "JDBC" == "${TYPE}" ]]; then
sh "${home_dir}"/lib/jdbc/sync_to_doris.sh "${home_dir}"/result/mysql/sync_to_doris.sql 2>>error.log
else
sh "${home_dir}"/lib/sync_to_doris.sh "${home_dir}"/result/mysql/sync_to_doris.sql 2>>error.log
fi
echo "source ${home_dir}/result/mysql/sync_to_doris.sql;" | mysql -h"${fe_master_host}" -P"${fe_master_port}" -u"${doris_username}" "${use_passwd}" 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== insert data failed ======================"
exit "${res}"
fi
echo "====================== insert data finished ======================"
echo "====================== start sync check ======================"
sh "${home_dir}"/lib/sync_check.sh "${home_dir}"/result/mysql/sync_check 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== sync check failed ======================"
exit "${res}"
fi
echo "====================== sync check finished ======================"
fi
# drop doris external table
if [[ "ODBC" == "${TYPE}" && "${DROP_EXTERNAL_TABLE}" -eq 1 ]]; then
echo "====================== start drop doris external table =========================="
for table in $(cat ${home_dir}/conf/doris_external_tables | grep -v '#' | awk -F '\n' '{print $1}' | sed 's/\./`.`/g'); do
echo "DROP TABLE IF EXISTS \`${table}\`;" | mysql -h"${fe_master_host}" -P"${fe_master_port}" -u"${doris_username}" "${use_passwd}" 2>>error.log
res=$?
if [ "${res}" != 0 ]; then
echo "====================== drop doris external table failed ======================"
exit "${res}"
fi
done
echo "====================== create drop external table finished ======================"
fi
# create doris external table and auto check mysql schema change
if [[ "ODBC" == "${TYPE}" && "${AUTO_EXTERNAL_TABLE}" -eq 1 ]]; then
echo "====================== start auto doris external table ======================"
nohup sh ${home_dir}/lib/e_auto.sh &
echo $! >e_auto.pid
echo "====================== create doris external table started ======================"
fi
2 conf文件夹下 env.conf
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# doris env
fe_master_host=填!
fe_password=填!不填sync_check报错!
fe_master_port=9030
doris_username=root
doris_password=填!
doris_odbc_name='MySQL ODBC 5.3 Unicode Driver'
doris_jdbc_catalog='jdbc_catalog'
doris_jdbc_default_db='information_schema'
##==========变化========本地放置jar包、否则会连接超时!!
#doris_jdcb_driver_url='https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar'
doris_jdcb_driver_url='file:///opt/selectdb/mysql-connector-java-8.0.18.jar'
doris_jdbc_driver_class='com.mysql.jdbc.Driver'
# mysql env
mysql_host=
mysql_port=3306
mysql_username=
mysql_password=
3 lib 下mysql_type_convert.sh(类型匹配、关键字冲突问题等,尤其mysql和doris的varchar长度不同!!)
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
path=$1
sed -i 's/AUTO_INCREMENT//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_bin//g' $path
sed -i 's/CHARACTER SET utf8mb3 COLLATE utf8mb3_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci//g' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8_general_ci//g' $path
sed -i 's/CHARACTER SET utf8 COLLATE utf8_general_ci//g' $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\(()\)\? ON UPDATE CURRENT_TIMESTAMP\(()\)\?//ig' $path
sed -i 's/DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP\(()\)\?//ig' $path
sed -i 's/CHARACTER SET utf8mb4 COLLATE utf8mb4_bin//g' $path
sed -i "s/DEFAULT '0000-00-00 00:00:00'/DEFAULT '2000-01-01 00:00:00'/g" $path
sed -i 's/DEFAULT CURRENT_TIMESTAMP\(()\)\?//ig' $path
sed -i 's/DEFAULT b/DEFAULT/g' $path
sed -i "s/DEFAULT \(\(\-\)\?[0-9]\+\(\.[0-9]\+\)\?\)/DEFAULT '\1'/g" $path
sed -i 's/CHARACTER SET utf8mb4//g' $path
sed -i 's/CHARACTER SET utf8//g' $path
sed -i 's/COLLATE utf8mb4_general_ci//g' $path
sed -i 's/COLLATE utf8_general_ci//g' $path
sed -i 's/COLLATE utf8mb4_unicode_ci//g' $path
sed -i 's/COLLATE utf8_unicode_ci//g' $path
sed -i 's/COLLATE utf8_bin//g' $path
sed -i 's/\<tinytext\>/varchar(65533)/g' $path
sed -i 's/text([^)]*)/varchar(65533)/g' $path
sed -i 's/\<text\>/varchar(65533)/g' $path
sed -i 's/\<mediumtext\>/varchar(65533)/g' $path
sed -i 's/\<longtext\>/varchar(65533)/g' $path
sed -i 's/\<tinyblob\>/varchar(65533)/g' $path
sed -i 's/blob([^)]*)/varchar(65533)/g' $path
sed -i 's/\<blob\>/varchar(65533)/g' $path
sed -i 's/\<mediumblob\>/varchar(65533)/g' $path
sed -i 's/\<longblob\>/varchar(65533)/g' $path
sed -i 's/\<tinystring\>/varchar(65533)/g' $path
sed -i 's/\<mediumstring\>/varchar(65533)/g' $path
sed -i 's/\<longstring\>/varchar(65533)/g' $path
sed -i 's/\<timestamp\>/datetime/g' $path
sed -i 's/\<unsigned\>//g' $path
sed -i 's/\<zerofill\>//g' $path
sed -i 's/\<json\>/varchar(65533)/g' $path
sed -i 's/enum([^)]*)/varchar(65533)/g' $path
sed -i 's/set([^)]*)/varchar(65533)/g' $path
sed -i 's/\<set\>/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/bit([^)]*)/varchar(65533)/g' $path
sed -i 's/\<bit\>/varchar(65533)/g' $path
sed -i 's/varbinary([^)]*)/varchar(65533)/g' $path
sed -i 's/binary([^)]*)/varchar(65533)/g' $path
sed -i 's/string([^)]*)/varchar(65533)/g' $path
sed -i 's/\<string\>/varchar(65533)/g' $path
sed -i 's/\<binary\>/varchar(65533)/g' $path
sed -i 's/\<varbinary\>/varchar(65533)/g' $path
sed -i 's/\<mediumint/int/g' $path
sed -i 's/float([^)]*)/float/g' $path
sed -i 's/double([^)]*)/double/g' $path
sed -i 's/time([^)]*)/varchar(64)/g' $path
sed -i 's/\<time\>/varchar(64)/g' $path
#sed -i 's/year([^)]*)/varchar(64)/g' $path
#sed -i 's/\<year\>/varchar(64)/g' $path
sed -i 's/varchar([0-9]*)/varchar(65533)/g' $path
#sed -i 's/varchar([1-9][0-9][0-9])/string/g' $path
4 lib/jdbc下的create_jdbc_catalog.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
cur_dir=$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
home_dir=$(cd "${cur_dir}"/../.. && pwd)
source ${home_dir}/conf/env.conf
# mkdir files to store tables and tables.sql
mkdir -p ${home_dir}/result/mysql
path=${1:-${home_dir}/result/mysql/jdbc_catalog.sql}
rm -f $path
##========变化=====加drop===否则总会有缓存导致的报错!url加round,处理datetime为空导致插入失败问题!
echo 'DROP CATALOG IF EXISTS '${doris_jdbc_catalog}';
CREATE CATALOG IF NOT EXISTS '${doris_jdbc_catalog}'
PROPERTIES (
"type"="jdbc",
"jdbc.user"="'${mysql_username}'",
"jdbc.password"="'${mysql_password}'",
"jdbc.jdbc_url"="jdbc:mysql://'${mysql_host}:${mysql_port}/${doris_jdbc_default_db}'?useSSL=false&characterEncoding=utf8&zeroDateTimeBehavior=round",
"jdbc.driver_url"="'${doris_jdcb_driver_url}'",
"jdbc.driver_class"="'${doris_jdbc_driver_class}'"
); ' >> $path
5 mysql_to_doris.sh
6 sync_to_doris.sh
7 最后执行批量同步:
vi /root/mysql_to_doris/conf/doris_tables
vi /root/mysql_to_doris/conf/mysql_tables
vi /root/mysql_to_doris/conf/env.conf
# sh bin/run.sh -o
#建表并同步数据
sh mysql_to_doris/bin/run.sh -o -t JDBC -e -i
# 失败看日志
/root/mysql_to_doris/conf/error.log
FlinkSQL自动化生成脚本
这个脚本很实用,否则mysql-flinkSQL-dorisSQL 各种类型匹配玩死你!
import pymysql.cursors
import pandas as pd
import os
import shutil
def get_mysql_colsInfo(ip="ip",port=9030,user="root",password="密码",db='jxpt_ods',table='ods_wkans_answer_result'):
#print('connect information:',ip, port, user, password, db,table)
conn = pymysql.connect(host=ip, port=port, user=user, password=password, database=db,charset='utf8')
mycursor = conn.cursor()
sql = "SELECT COLUMN_NAME,DATA_TYPE FROM information_schema.COLUMNS WHERE table_name = '" + table + "' AND table_schema = '" + db + "'";
#print(sql)
mycursor.execute(sql)
mysql_cols = mycursor.fetchall()
mysql_cols =[list(i) for i in mysql_cols]
print(mysql_cols)
return mysql_cols
def mysql2source_t_createtable(old_cols,table,ip,db):
mysql2source_t = {'tinyint unsigned': 'smallint',
'mediumint': 'int',
'smallint unsigned': 'int',
'int unsigned': 'bigint',
'bigint unsigned': 'decimal(20,0)',
'double precision': 'double',
'numeric': 'decimal',
'tinint(1)': 'boolean',
'datetime': 'timestamp',
'char': 'string',
'varchar': 'string',
'text': 'string',
'longtext': 'string',
'bit': 'string',
}
cols=[]
cols_list=[]
for col in old_cols:
if col[1] in mysql2source_t:
col[1]=mysql2source_t[col[1].lower()]
cols.append( '`'+col[0]+'`'+' '+col[1])
cols_list.append(['`'+col[0]+'`',col[1]])
new_cols=','.join(cols)
create_table_mysql2source_tstr = """
DROP TABLE IF EXISTS {};
CREATE TABLE IF NOT EXISTS {}(
{},
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'mysql-cdc',
'hostname' = '{}',
'port' = '3306',
'username' = 'canal',
'password' = '密码',
'database-name' = '{}',
'table-name' = '{}',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode' = 'never');
""".format(table,table,new_cols,ip,db,table)
return create_table_mysql2source_tstr,cols_list
def source_t2sink_t_createtable(old_cols,table, db):
cols = []
for col in old_cols:
cols.append(col[0]+ ' ' + col[1])
new_cols = ','.join(cols)
table='ods_'+table
create_table_source_t2sink_t_tstr = """
DROP TABLE IF EXISTS {};
CREATE TABLE IF NOT EXISTS {}(
{},
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'doris',
'fenodes' = 'fe:8030',
'table.identifier' = '{}',
'username' = 'root',
'password' = '密码',
'sink.properties.strict_mode'='true',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='{}01'
);
""".format(table, table, new_cols, db+'.'+table,table)
return create_table_source_t2sink_t_tstr,old_cols
def sink_t2doris_createtable(old_cols,table, db_doris):
sink_t2doris = {'timestamp': 'datetime'}
cols = []
for col in old_cols:
if (col[0]=='`id`') and (col[1]=='string'):col[1]='varchar(64)'
if col[1] in sink_t2doris:col[1] = sink_t2doris[col[1].lower()]
cols.append(col[0]+ ' ' + col[1])
new_cols = ','.join(cols)
table=db_doris+'.ods_'+table
create_table_sink_t2doris_str = """
DROP TABLE IF EXISTS {};
CREATE TABLE IF NOT EXISTS {}(
{}
)ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
""".format(table, table, new_cols)
return create_table_sink_t2doris_str
def whole_mysql2sink_t(ip="ip", port=3306, user="root", password="密码", db='jxpt_ods',table='ods_wkans_answer_result',db_doris='doris',is2doris=True):
mysql_cols=get_mysql_colsInfo(ip=ip, port=port, user=user, password=password, db=db,table=table)
create_table_mysql2source_t_str,source_t_cols=mysql2source_t_createtable(mysql_cols,table,ip,db)
create_table_source_t2sink_t_tstr,sink_t_cols=source_t2sink_t_createtable(source_t_cols,table, db_doris)
create_table_sink_t2doris_str=sink_t2doris_createtable(sink_t_cols,table, db_doris)
final_create_table_str1="""
-- 设置心跳检查
SET execution.checkpointing.interval = 10s;
-- mysql转source表
{}
-- source表转sink表
{}
INSERT INTO {} select * from {};
""".format(create_table_mysql2source_t_str,create_table_source_t2sink_t_tstr,'ods_'+table,table)
final_create_table_str2="""
-- sink表转doris
{}
""".format(create_table_sink_t2doris_str)
if is2doris:
final_create_table_str=final_create_table_str2
else:
final_create_table_str = final_create_table_str1
#print(final_create_table_str)
return final_create_table_str
def batch_mysql2sink_t(convert_list=[],is2doris=True):
convert_str_list=[]
for i in convert_list:
convert_sql=whole_mysql2sink_t(ip=i[2], port=3306, user=i[3], password=i[4], db=i[1],table=i[0], db_doris=i[5], is2doris=is2doris)
convert_str_list.append(convert_sql)
final_convert_str='\n'.join(convert_str_list)
if is2doris:
sql_file_name='/flinksync/output/sink2doris.sql'
else:
sql_file_name = '/flinksync/output/mysql2sink.sql'
with open(sql_file_name,'w',encoding='utf-8') as f:
f.write(final_convert_str)
f.close()
def get_convert_list():
convert_file_momdir=r'/flinksync/input'
convert_file_list=[i for i in os.listdir(convert_file_momdir) if i.endswith('.csv')]
convert_file_name=convert_file_list[0]
convert_file_path=os.path.join(convert_file_momdir,convert_file_name)
#print(convert_file_path)
df=pd.read_csv(convert_file_path)
#print(df)
convert_list = []
for row in df.itertuples():
row=list(row)
if row[1]==row[1]:
mysql_info1=[row[1],row[2],row[4],row[6],row[7],row[8]]
mysql_info2=[row[1], row[3], row[5], row[6], row[7],row[9]]
mysql_info1=[str(i) for i in mysql_info1]
mysql_info2 = [str(i) for i in mysql_info2]
#convert_list.append(mysql_info1)
if ('nan' not in mysql_info1) and ('' not in mysql_info1): convert_list.append(mysql_info1)
if ('nan' not in mysql_info2) and ('' not in mysql_info2): convert_list.append(mysql_info2)
#移动input到history
shutil.move(convert_file_path,os.path.join(r'/flinksync/history',convert_file_name))
print(convert_list)
return convert_list
def batch_covert_main():
convert_list=get_convert_list()
batch_mysql2sink_t(convert_list,is2doris=True)
batch_mysql2sink_t(convert_list, is2doris=False)
if __name__ == '__main__':
batch_covert_main()
生成sql文件1-sink2doris.sql(doris建表语句):
DROP TABLE IF EXISTS ods.ods_log;
CREATE TABLE IF NOT EXISTS ods.ods_log(
`id` bigint,`exam_id` bigint,`user_id` string,`business_id` string,`record_code` bigint,`submit_code` string,`operate_type` string,`operate_status` int,`operate_code` bigint,`operate_bus_code` string,`operate_progress` string,`operate_message` string,`operate_error` string,`operate_error_expection` string,`operate_data` string,`create_time` datetime
)ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
生成sql文件2-mysql2sink.sql(flinkSQL建表语句):
-- 设置心跳检查
SET execution.checkpointing.interval = 10s;
-- mysql转source表
DROP TABLE IF EXISTS log;
CREATE TABLE IF NOT EXISTS log(
`id` bigint,`exam_id` bigint,`user_id` string,`business_id` string,`record_code` bigint,`submit_code` string,`operate_type` string,`operate_status` int,`operate_code` bigint,`operate_bus_code` string,`operate_progress` string,`operate_message` string,`operate_error` string,`operate_error_expection` string,`operate_data` string,`create_time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'mysql-cdc',
'hostname' = '$MYSQLIP',
'port' = '3306',
'username' = 'canal',
'password' = '密码',
'database-name' = 'db',
'table-name' = 'log',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode' = 'never');
-- source表转sink表
DROP TABLE IF EXISTS ods_log;
CREATE TABLE IF NOT EXISTS ods_log(
`id` bigint,`exam_id` bigint,`user_id` string,`business_id` string,`record_code` bigint,`submit_code` string,`operate_type` string,`operate_status` int,`operate_code` bigint,`operate_bus_code` string,`operate_progress` string,`operate_message` string,`operate_error` string,`operate_error_expection` string,`operate_data` string,`create_time` timestamp,
PRIMARY KEY (`id`) NOT ENFORCED
)WITH (
'connector' = 'doris',
'fenodes' = 'FEIP:8030',
'table.identifier' = 'ods.ods_log',
'username' = 'root',
'password' = '密码',
'sink.properties.strict_mode'='true',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='ods_log01'
);
INSERT INTO ods_log select * from log;
jps-all脚本
查看集群进程
#!/bin/bash
for i in 'node104.data' 'node116.data' 'node117.data' 'node118.data' 'node119.data' 'node173.data' 'node174.data' 'kafka175.data' 'kafka176.data' 'kafka177.data' 'adapter101.data' 'canal99.data'
do
echo ---------$i------------
ssh $i "/usr/java/jdk1.8.0_151/bin/jps" | grep -v jps
done
Doris-View实现实时计算
毫秒级计算!
drop view if exists ads.ads_examinee_info_rt_v;
-- 考生详情:考生ID、考试ID、操作动作、状态、异常信息、更新时间
create view ads.ads_examinee_info_rt_v as
SELECT id,exam_id,operate_type, operate_status, operate_error ,create_time from
(SELECT user_id id ,exam_id,operate_type, operate_status, operate_error,create_time,ROW_NUMBER() over(partition by user_id,exam_id order by create_time desc)as num from ods.ods_log where user_id is not null and exam_id is not null and to_date(create_time)=CURDATE() )res where num=1 ;
追加:Canal 同步更简单、笔者也已实现但是官方认为不够稳定,线上不建议采用。
1 canal安装
开始增量同步操作:
1 mysql建表语句
/*DDL 信息*/------------
CREATE TABLE `resource` (
`id` bigint(20) NOT NULL,
`version` int(11) DEFAULT NULL COMMENT '版本',
`ex_create_time` datetime DEFAULT NULL COMMENT '创建时间',
`ex_create_user_id` varchar(32) DEFAULT NULL COMMENT '创建用户id',
`ex_update_time` datetime DEFAULT NULL COMMENT '更新时间',
`ex_update_user_id` varchar(32) DEFAULT NULL COMMENT '更新用户id',
`del` varchar(1) DEFAULT NULL,
`task_id` bigint(20) DEFAULT NULL COMMENT '任务id',
`task_code` varchar(255) DEFAULT NULL COMMENT '任务code',
`bus_id` bigint(20) DEFAULT NULL COMMENT '资源业务id',
`resource_bus_type` varchar(30) DEFAULT NULL COMMENT '资源业务类型',
`resource_id` varchar(255) DEFAULT NULL COMMENT '资源id',
`resource_type` varchar(255) DEFAULT NULL COMMENT '资源类型',
`resource_name` varchar(1100) DEFAULT NULL,
`resource_properties` text COMMENT '资源属性',
`default_action_code` varchar(255) DEFAULT NULL COMMENT '默认动作',
`action_require` varchar(255) DEFAULT NULL COMMENT '动作要求',
`resource_source_type` varchar(100) DEFAULT NULL,
`num` int(11) DEFAULT '0' COMMENT '序号',
`path` varchar(100) DEFAULT NULL COMMENT '路径',
`cou_resource_id` varchar(100) DEFAULT NULL COMMENT '课程资源id',
`course_resource_type` varchar(32) DEFAULT NULL COMMENT '课程资源类型',
`knowledge_id` varchar(32) DEFAULT NULL COMMENT '知识点id',
`course_id` varchar(32) DEFAULT NULL COMMENT '课程id',
`score` decimal(11,2) DEFAULT NULL COMMENT '标准分数',
`subject_type` varchar(50) DEFAULT NULL COMMENT '科目',
PRIMARY KEY (`id`) USING BTREE,
KEY `idx_taskid_resourcebustype` (`task_id`,`resource_bus_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT
2 doris建表语句
CREATE TABLE `jxpt_ods`.`ods_resource` (
`id` bigint(20) NOT NULL,
`version` int(11) DEFAULT NULL COMMENT '版本',
`ex_create_time` datetime DEFAULT NULL COMMENT '创建时间',
`ex_create_user_id` varchar(32) DEFAULT NULL COMMENT '创建用户id',
`ex_update_time` datetime DEFAULT NULL COMMENT '更新时间',
`ex_update_user_id` varchar(32) DEFAULT NULL COMMENT '更新用户id',
`del` varchar(1) DEFAULT NULL,
`task_id` bigint(20) DEFAULT NULL COMMENT '任务id',
`task_code` varchar(255) DEFAULT NULL COMMENT '任务code',
`bus_id` bigint(20) DEFAULT NULL COMMENT '资源业务id',
`resource_bus_type` varchar(30) DEFAULT NULL COMMENT '资源业务类型',
`resource_id` varchar(255) DEFAULT NULL COMMENT '资源id',
`resource_type` varchar(255) DEFAULT NULL COMMENT '资源类型',
`resource_name` varchar(1100) DEFAULT NULL,
`resource_properties` text COMMENT '资源属性',
`default_action_code` varchar(255) DEFAULT NULL COMMENT '默认动作',
`action_require` varchar(255) DEFAULT NULL COMMENT '动作要求',
`resource_source_type` varchar(100) DEFAULT NULL,
`num` int(11) DEFAULT '0' COMMENT '序号',
`path` varchar(100) DEFAULT NULL COMMENT '路径',
`cou_resource_id` varchar(100) DEFAULT NULL COMMENT '课程资源id',
`course_resource_type` varchar(32) DEFAULT NULL COMMENT '课程资源类型',
`knowledge_id` varchar(32) DEFAULT NULL COMMENT '知识点id',
`course_id` varchar(32) DEFAULT NULL COMMENT '课程id',
`score` decimal(11,2) DEFAULT NULL COMMENT '标准分数',
`subject_type` varchar(50) DEFAULT NULL COMMENT '科目'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 8
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
);
3 执行文章来源:https://www.toymoban.com/news/detail-645719.html
CREATE SYNC `jxpt_ods`.`od_resource` -- 作业在当前数据库内的唯一标识,相同job_name的作业只能有一个在运行
(
FROM `task-basic`.resource -- mysql 库名表名
INTO `jxpt_ods`.`ods_resource` -- doris 表名
(id,version,ex_create_time,ex_create_user_id,ex_update_time,ex_update_user_id,del,task_id,task_code,bus_id,resource_bus_type,resource_id,resource_type,resource_name,resource_properties,default_action_code,action_require,resource_source_type,num,path,cou_resource_id,course_resource_type,knowledge_id,course_id,score,subject_type)
)
FROM BINLOG
(
"type" = "canal",
"canal.server.ip" = "", --dorisFe地址
"canal.server.port" = "11111", --默认端口号
"canal.destination" = "example1", --canal-instance的标识
"canal.username" = "canal",--mysql授权用户名
"canal.password" = "密码" -- mysql 授权用户密码
);
4 监控文章来源地址https://www.toymoban.com/news/detail-645719.html
SHOW SYNC JOB ;
STOP SYNC JOB ods_resource;
PAUSE SYNC JOB;
RESUME SYNC JOB;
到了这里,关于Doris+Flink搭建数据平台的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!