flink cdc多种数据源安装、配置与验证 flink cdc多种数据源安装、配置与验证

这篇具有很好参考价值的文章主要介绍了flink cdc多种数据源安装、配置与验证 flink cdc多种数据源安装、配置与验证。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 搜索

flink cdc多种数据源安装、配置与验证

文章目录

  • 1. 前言
  • 2. 数据源安装与配置
  • 2.1 MySQL
  • 2.1.1 安装
  • 2.1.2 CDC 配置
  • 2.2 Postgresql
  • 2.2.1 安装
  • 2.2.2 CDC 配置
  • 2.3 Oracle
  • 2.3.1 安装
  • 2.3.2 CDC 配置
  • 2.4 SQLServer
  • 2.4.1 安装
  • 2.4.2 CDC 配置
  • 3. 验证
  • 3.1 Flink版本与CDC版本的对应关系
  • 3.2 下载相关包
  • 3.3 添加cdc jar 至lib目录
  • 3.4 验证

本文目录结构

|___ 1. 前言
|___ 2. 数据源安装与配置
|______ 2.1 MySQL
|_________ 2.1.1 安装
|_________ 2.1.2 CDC 配置
|______ 2.2 Postgresql
|_________ 2.2.1 安装
|_________ 2.2.2 CDC 配置
|______ 2.3 Oracle
|_________2.3.1 安装
|_________2.3.2 CDC 配置
|_______2.4 SQLServer
|_________2.4.1 安装
|_________2.4.2 CDC 配置
|___ 3. 验证
|_______3.1 Flink版本与CDC版本的对应关系
|_______3.2 下载相关包
|_______3.3 添加cdc jar 至lib目录
|_______3.4 验证


1. 前言

关于如何使用和配置flink cdc功能,其实在官方文档(CDC Connectors for Apache Flink® — CDC Connectors for Apache Flink® documentation)有相关的教程了,如下:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

但是讲解的不是很详细,比如数据源怎么安装?怎么配置?都没有很详细的描述每一步骤,因此博主前面发布多篇文章以此来记录flink cdc相关数据源以及其配置相关的文章,有兴趣的同学可以参考下:

  • 《docker下安装oracle11g(一次安装成功)》
  • 《Docker下安装SqlServer2019》
  • 《flink postgresql cdc实时同步(含pg安装配置等)》
  • 《flink oracle cdc实时同步(超详细)》
  • 《flink sqlserver cdc实时同步(含sqlserver安装配置等)》

本文主要就是记录在docker下安装和配置各种数据源,以实现flink cdc的功能,包含如下常见的数据源:

数据源 版本
MySQL 8.0.25
Postgresql 10.6
Oracle 11g
SqlServer 2019

2. 数据源安装与配置

2.1 MySQL

版本:8.0.25

2.1.1 安装

Step1: 拉取mysql镜像:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mysql:8.0.25
</code></span></span></span></span>

Step2: 创建并运行 MySQL 容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30025</span>:3306 <span style="color:#ee9900">--name</span> mysql8.0.25 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">MYSQL_ROOT_PASSWORD</span><span style="color:#6272a4">=</span>root mysql:8.0.25
</code></span></span></span></span>
  • 1

2.1.2 CDC 配置

Step1:进入正在运行的mysql容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> mysql8.0.25 mysql <span style="color:#ee9900">-uroot</span> <span style="color:#ee9900">-proot</span>
</code></span></span></span></span>
  • 1

Step2:配置 CDC

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 启用二进制日志</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> log_bin <span style="color:#6272a4">=</span> <span style="color:#ff79c6">ON</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 设置二进制日志格式为行级别</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> binlog_format <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'ROW'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step3(非必要):如果配置没生效,重启容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart mysql8.0.25
</code></span></span></span></span>
  • 1

2.2 Postgresql

版本:PostgreSQL 10.6 (Debian 10.6-1.pgdg90+1)

2.2.1 安装

Step1: 拉取 PostgreSQL 10.6 版本的镜像:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull postgres:10.6
</code></span></span></span></span>
  • 1

Step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30028</span>:5432 <span style="color:#ee9900">--name</span> postgres-10.6 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">POSTGRES_PASSWORD</span><span style="color:#6272a4">=</span>postgres postgres:10.6 <span style="color:#ee9900">-c</span> <span style="color:#f1fa8c">'shared_preload_libraries=pgoutput'</span>
</code></span></span></span></span>
  • 1

Step3: 查看容器是否创建成功:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> <span style="color:#6272a4">|</span> <span style="color:#8be9fd">grep</span> postgres-10.6
</code></span></span></span></span>
  • 1

2.2.2 CDC 配置

Step1:docker进去Postgresql数据的容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> postgres-10.6  <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>
  • 1

Step2:编辑postgresql.conf配置文件:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">vi</span> /var/lib/postgresql/data/postgresql.conf 
</code></span></span></span></span>
  • 1

配置内容如下:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-py"><span style="color:#6272a4"># 更改wal日志方式为logical(方式有:minimal、replica 、logical  )</span>
wal_level <span style="color:#6272a4">=</span> logical  

<span style="color:#6272a4"># 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots</span>
max_replication_slots <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>

<span style="color:#6272a4"># 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样</span>
max_wal_senders <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>     

<span style="color:#6272a4"># 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)</span>
wal_sender_timeout <span style="color:#6272a4">=</span> 180s	          
</code></span></span></span></span>

Step3:重启容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart postgres-10.6
</code></span></span></span></span>
  • 1

连接数据库,如果查询一下语句,返回logical表示修改成功:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">SHOW</span> wal_level<span style="color:#999999">;</span>
</code></span></span></span></span>
  • 1

Step4:新建用户并赋权。使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库 test_db</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> test_db<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 连接到新创建的数据库 test_db</span>
\c test_db

<span style="color:#6272a4">-- 创建 t_user 表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#f1fa8c">"public"</span><span style="color:#999999">.</span><span style="color:#f1fa8c">"t_user"</span> <span style="color:#999999">(</span>
    <span style="color:#f1fa8c">"id"</span> int8 <span style="color:#6272a4">NOT</span> <span style="color:#8be9fd">NULL</span><span style="color:#999999">,</span>
    <span style="color:#f1fa8c">"name"</span> <span style="color:#ff79c6">varchar</span><span style="color:#999999">(</span><span style="color:#f1fa8c">255</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    <span style="color:#f1fa8c">"age"</span> int2<span style="color:#999999">,</span>
    <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#f1fa8c">"id"</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- pg新建用户</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> test1 <span style="color:#ff79c6">WITH</span> PASSWORD <span style="color:#f1fa8c">'test123'</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 给用户复制流权限</span>
<span style="color:#ff79c6">ALTER</span> ROLE test1 <span style="color:#ff79c6">replication</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 给用户登录数据库权限</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CONNECT</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">DATABASE</span> test_db <span style="color:#ff79c6">to</span> test1<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 把当前库public下所有表查询权限赋给用户</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">PRIVILEGES</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span> <span style="color:#6272a4">IN</span> <span style="color:#ff79c6">SCHEMA</span> <span style="color:#ff79c6">public</span> <span style="color:#ff79c6">TO</span> test1<span style="color:#999999">;</span>

</code></span></span></span></span>

Step4:发布表:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置发布为true</span>
<span style="color:#ff79c6">update</span> pg_publication <span style="color:#ff79c6">set</span> puballtables<span style="color:#6272a4">=</span><span style="color:#8be9fd">true</span> <span style="color:#ff79c6">where</span> pubname <span style="color:#6272a4">is</span> <span style="color:#6272a4">not</span> <span style="color:#8be9fd">null</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 把所有表进行发布</span>
<span style="color:#ff79c6">CREATE</span> PUBLICATION dbz_publication <span style="color:#ff79c6">FOR</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查询哪些表已经发布</span>
<span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> pg_publication_tables<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> t_user REPLICA <span style="color:#ff79c6">IDENTITY</span> <span style="color:#ff79c6">FULL</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)</span>
<span style="color:#ff79c6">select</span> relreplident <span style="color:#ff79c6">from</span> pg_class <span style="color:#ff79c6">where</span> relname<span style="color:#6272a4">=</span><span style="color:#f1fa8c">'t_user'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

2.3 Oracle

版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

2.3.1 安装

Step1:拉取 oracle 11g 镜像(有6g,要等较长的时间)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>
  • 1

Step2:执行以下命令以创建并运行 Oracle 11g 容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30026</span>:1521 <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">8081</span>:8080 <span style="color:#999999">\</span>
<span style="color:#ee9900">--name</span> oracle_11g <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_HOME</span><span style="color:#6272a4">=</span>/home/oracle/app/oracle/product/11.2.0/dbhome_2 <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_SID</span><span style="color:#6272a4">=</span>helowin <span style="color:#999999">\</span>
registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>

Step3:查看容器是否启动

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> -a<span style="color:#6272a4">|</span><span style="color:#8be9fd">grep</span> oracle_11g
</code></span></span></span></span>

Step4:进入容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>

**Step5:**设置账号密码

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4"># 1. 切换至root用户(默认是oracle用户),密码为helowin</span>
<span style="color:#8be9fd">su</span> root

<span style="color:#6272a4"># 2. 创建软链接</span>
<span style="color:#8be9fd">ln</span> <span style="color:#ee9900">-s</span> <span style="color:#ee9900">$ORACLE_HOME</span>/bin/sqlplus /usr/bin

<span style="color:#6272a4"># 3.切换回oracle用户</span>
<span style="color:#8be9fd">su</span> oracle

<span style="color:#6272a4"># 4. 登录sql plus</span>
sqlplus /nolog
conn /as sysdba
<span style="color:#6272a4">## 4.1 修改system用户密码为system</span>
alter user system identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.2 修改sys用户密码为system</span>
alter user sys identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.3 新增一个测试用户(用户名:test,密码:test123);</span>
create user <span style="color:#f1fa8c">test</span> identified by test123<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.4 将dba权限给内部管理员账号和密码</span>
grant connect,resource,dba to <span style="color:#f1fa8c">test</span><span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.5 修改密码策略规则为:密码永不过期</span>
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.6 修改数据库最大连接数;</span>
alter system <span style="color:#f1fa8c">set</span> <span style="color:#ee9900">processes</span><span style="color:#6272a4">=</span><span style="color:#f1fa8c">1000</span> <span style="color:#ee9900">scope</span><span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.7 最后重启数据库;</span>
<span style="color:#8be9fd">shutdown</span> immediate<span style="color:#999999">;</span>
startup<span style="color:#999999">;</span>

<span style="color:#6272a4"># 5.退出</span>
<span style="color:#f1fa8c">exit</span>
</code></span></span></span></span>

2.3.2 CDC 配置

Step1:进入容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>

Step2:以DBA的权限登录数据库

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
</code></span></span></span></span>

Step3:启用日志归档

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置数据库恢复文件目标大小为10G</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest_size <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">10</span>G<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 设置数据库恢复文件目标路径</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0'</span> scope<span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 立即关闭数据库</span>
<span style="color:#ff79c6">shutdown</span> immediate<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 以mount模式启动数据库</span>
startup mount<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 启用数据库归档日志模式</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> archivelog<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 打开数据库,允许用户访问</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> <span style="color:#ff79c6">open</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">archive log list<span style="color:#999999">;</span>
</code></span></span></span></span>
  • 1

Step5:创建表空间

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
<span style="color:#6272a4">-- 创建一个名为"logminer_tbs"的表空间</span>
<span style="color:#6272a4">-- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf",其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录,"logminer_tbs.dbf"是数据文件的文件名</span>
<span style="color:#6272a4">-- 设置表空间的初始大小为25MB</span>
<span style="color:#6272a4">-- 如果数据文件已经存在且可重用,将其重用,否则创建一个新的数据文件</span>
<span style="color:#6272a4">-- 启用表空间的自动扩展功能,即当表空间空间不足时,自动增加数据文件的大小</span>
<span style="color:#6272a4">-- 设置表空间的最大允许大小为无限,即表空间可以无限制地自动扩展</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLESPACE</span> logminer_tbs DATAFILE <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf'</span> SIZE <span style="color:#f1fa8c">25</span>M REUSE AUTOEXTEND <span style="color:#ff79c6">ON</span> MAXSIZE UNLIMITED<span style="color:#999999">;</span>
</code></span></span></span></span>

Step6:创建用户并赋权

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> flinkuser IDENTIFIED <span style="color:#ff79c6">BY</span> flinkpw <span style="color:#ff79c6">DEFAULT</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS QUOTA UNLIMITED <span style="color:#ff79c6">ON</span> LOGMINER_TBS<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">SESSION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。</span>
<span style="color:#6272a4">-- GRANT SET CONTAINER TO flinkuser;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$<span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行任何表的闪回操作。</span>
<span style="color:#ff79c6">GRANT</span> FLASHBACK <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何表的数据。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。</span>
<span style="color:#ff79c6">GRANT</span> SELECT_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。</span>
<span style="color:#ff79c6">GRANT</span> EXECUTE_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何事务。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TRANSACTION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。</span>
<span style="color:#6272a4">-- GRANT LOGMINING TO flinkuser;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户锁定任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">LOCK</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户修改任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建序列。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> SEQUENCE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR_D <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG_HISTORY <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_LOGS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_CONTENTS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_PARAMETERS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGFILE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVED_LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVE_DEST_STATUS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
</code></span></span></span></span>

Step7:数据库和表启用增量日志

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 切换至flinkuser用户</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> flinkuser<span style="color:#6272a4">/</span>flinkpw

<span style="color:#6272a4">-- 创建customers表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> customers <span style="color:#999999">(</span>
    customer_id NUMBER <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span><span style="color:#999999">,</span>
    customer_name VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">50</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    email VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">100</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    phone VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">20</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查看LOGMINER_TBS表空间下的所有表</span>
<span style="color:#ff79c6">select</span> tablespace_name<span style="color:#999999">,</span> table_name <span style="color:#ff79c6">from</span> user_tables
<span style="color:#ff79c6">where</span> tablespace_name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'LOGMINER_TBS'</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA

<span style="color:#6272a4">-- 为LOGMINER_TBS表空间下的customers表启用增强日志记录</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> FLINKUSER<span style="color:#999999">.</span>CUSTOMERS <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span> <span style="color:#999999">(</span><span style="color:#ff79c6">ALL</span><span style="color:#999999">)</span> <span style="color:#ff79c6">COLUMNS</span>

<span style="color:#6272a4">-- 为数据库启用增强日志记录:</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span><span style="color:#999999">;</span>
</code></span></span></span></span>

2.4 SQLServer

版本:Microsoft SQL Server 2019 (RTM-CU21) (KB5025808) - 15.0.4316.3 (X64)

2.4.1 安装

Step1:拉取SQL Server 2019 镜像

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>

Step2:运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'ACCEPT_EULA=Y'</span> <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'SA_PASSWORD=abc@123456'</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30027</span>:1433 <span style="color:#ee9900">--name</span> sql_server_2019 <span style="color:#ee9900">-d</span> mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>

Step3:验证 SQL Server 容器是否正在运行

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code>docker ps -a|grep sql_server_2019
</code></span></span></span></span>

2.4.2 CDC 配置

Step1:开启SQLServer代理

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4">## 使用root用户登录容器</span>
<span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> <span style="color:#ee9900">--user</span> root sql_server_2019 <span style="color:#8be9fd">bash</span>

<span style="color:#6272a4">## 进入容器后,执行命令启用Agent</span>
/opt/mssql/bin/mssql-conf <span style="color:#f1fa8c">set</span> sqlagent.enabled <span style="color:#8be9fd">true</span>

<span style="color:#6272a4">## 退出,重启容器</span>
<span style="color:#f1fa8c">exit</span>
<span style="color:#8be9fd">docker</span> restart sql_server_2019
</code></span></span></span></span>

Step2:创建’cdc_test’测试数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> cdc_test<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 启用CDC功能</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_db<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)</span>
<span style="color:#ff79c6">SELECT</span> is_cdc_enabled <span style="color:#ff79c6">FROM</span> sys<span style="color:#999999">.</span><span style="color:#ff79c6">databases</span> <span style="color:#ff79c6">WHERE</span> name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step3:选择要进行 CDC 跟踪的表(这里使用orders表作为演示

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建示例表(orders)</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> orders <span style="color:#999999">(</span>
     id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     order_date <span style="color:#ff79c6">date</span><span style="color:#999999">,</span>
     purchaser <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     quantity <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     product_id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#999999">[</span>id<span style="color:#999999">]</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- schema_name 是表所属的架构(schema)的名称。</span>
<span style="color:#6272a4">-- table_name 是要启用 CDC 跟踪的表的名称。</span>
<span style="color:#6272a4">-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_table
  <span style="color:#ee9900">@source_schema</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
  <span style="color:#ee9900">@source_name</span>   <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span><span style="color:#999999">,</span>
  <span style="color:#ee9900">@role_name</span>     <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_role'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

3. 验证

如果要验证flink cdc的功能,需要先下载flink的安装包,然后下载相应的cdc jar包并依赖,最后使用安装包里面的sql-client写相关的flink sql即可验证。

3.1 Flink版本与CDC版本的对应关系

下载Flink安装包以及jar包前,必须确定Flink CDC与Flink版本关系:

Flink CDC 版本 Flink 版本
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*
2.1.* 1.13.*
2.2.* 1.13.*1.14.*
2.3.* 1.13.*1.14.*1.15.*1.16.0
2.4.* 1.13.*1.14.*1.15.*1.16.*1.17.0

本文以 Flink1.13.6 + Flink CDC 2.2.0 版本为例子演示。

3.2 下载相关包

flink 安装包下载,下载地址:Downloads | Apache Flink
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

下载cdc相关的jar,根据自己的需求,下载相关的cdc jar:Central Repository: com/ververica
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

3.3 添加cdc jar 至lib目录

把需要验证的cdc jar放到flink安装包解压之后的lib目录(<FLINK_HOME>/lib/):
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

3.4 验证

使用下面的命令启动 Flink 集群:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/start-cluster.sh
</code></span></span></span></span>

启动成功,可以访问 http://localhost:8081 访问到 Flink Web UI:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
使用下面的命令启动 Flink SQL CLI :

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/sql-client.sh
</code></span></span></span></span>

展示如下页面,表示启动flink客户端成功:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
执行如下FlinkSQL:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> t_source_sqlserver <span style="color:#999999">(</span>
   id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    order_date <span style="color:#ff79c6">DATE</span><span style="color:#999999">,</span>
    purchaser <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    quantity <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    product_id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span>id<span style="color:#999999">)</span> <span style="color:#6272a4">NOT</span> ENFORCED
<span style="color:#999999">)</span> <span style="color:#ff79c6">WITH</span> <span style="color:#999999">(</span>
  <span style="color:#f1fa8c">'connector'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sqlserver-cdc'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'hostname'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'10.194.183.120'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'port'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'30027'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'username'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sa'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'password'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'abc@123456'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'database-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'schema-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'table-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>
</code></span></span></span></span>

可以看到执行成功了:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
执行select 语句,以便实时查看该表的数据变动:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> t_source_sqlserver<span style="color:#999999">;</span>
</code></span></span></span></span>

从下图,可以看出,只要修改左边的数据,会在控制台实时显示新增删除的数据。
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
同时,也能在Flink web页面看到任务正在运行:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

最后,可以通过如下命令关闭掉Flink启动的集群:文章来源地址https://www.toymoban.com/news/detail-703092.html

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./stop-cluster.sh
</code></span></span></span></span>

文章目录

  • 1. 前言
  • 2. 数据源安装与配置
  • 2.1 MySQL
  • 2.1.1 安装
  • 2.1.2 CDC 配置
  • 2.2 Postgresql
  • 2.2.1 安装
  • 2.2.2 CDC 配置
  • 2.3 Oracle
  • 2.3.1 安装
  • 2.3.2 CDC 配置
  • 2.4 SQLServer
  • 2.4.1 安装
  • 2.4.2 CDC 配置
  • 3. 验证
  • 3.1 Flink版本与CDC版本的对应关系
  • 3.2 下载相关包
  • 3.3 添加cdc jar 至lib目录
  • 3.4 验证

本文目录结构

|___ 1. 前言
|___ 2. 数据源安装与配置
|______ 2.1 MySQL
|_________ 2.1.1 安装
|_________ 2.1.2 CDC 配置
|______ 2.2 Postgresql
|_________ 2.2.1 安装
|_________ 2.2.2 CDC 配置
|______ 2.3 Oracle
|_________2.3.1 安装
|_________2.3.2 CDC 配置
|_______2.4 SQLServer
|_________2.4.1 安装
|_________2.4.2 CDC 配置
|___ 3. 验证
|_______3.1 Flink版本与CDC版本的对应关系
|_______3.2 下载相关包
|_______3.3 添加cdc jar 至lib目录
|_______3.4 验证


1. 前言

关于如何使用和配置flink cdc功能,其实在官方文档(CDC Connectors for Apache Flink® — CDC Connectors for Apache Flink® documentation)有相关的教程了,如下:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

但是讲解的不是很详细,比如数据源怎么安装?怎么配置?都没有很详细的描述每一步骤,因此博主前面发布多篇文章以此来记录flink cdc相关数据源以及其配置相关的文章,有兴趣的同学可以参考下:

  • 《docker下安装oracle11g(一次安装成功)》
  • 《Docker下安装SqlServer2019》
  • 《flink postgresql cdc实时同步(含pg安装配置等)》
  • 《flink oracle cdc实时同步(超详细)》
  • 《flink sqlserver cdc实时同步(含sqlserver安装配置等)》

本文主要就是记录在docker下安装和配置各种数据源,以实现flink cdc的功能,包含如下常见的数据源:

数据源 版本
MySQL 8.0.25
Postgresql 10.6
Oracle 11g
SqlServer 2019

2. 数据源安装与配置

2.1 MySQL

版本:8.0.25

2.1.1 安装

Step1: 拉取mysql镜像:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mysql:8.0.25
</code></span></span></span></span>

Step2: 创建并运行 MySQL 容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30025</span>:3306 <span style="color:#ee9900">--name</span> mysql8.0.25 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">MYSQL_ROOT_PASSWORD</span><span style="color:#6272a4">=</span>root mysql:8.0.25
</code></span></span></span></span>
  • 1

2.1.2 CDC 配置

Step1:进入正在运行的mysql容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> mysql8.0.25 mysql <span style="color:#ee9900">-uroot</span> <span style="color:#ee9900">-proot</span>
</code></span></span></span></span>
  • 1

Step2:配置 CDC

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 启用二进制日志</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> log_bin <span style="color:#6272a4">=</span> <span style="color:#ff79c6">ON</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 设置二进制日志格式为行级别</span>
mysql<span style="color:#6272a4">></span> <span style="color:#ff79c6">SET</span> <span style="color:#ff79c6">GLOBAL</span> binlog_format <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'ROW'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step3(非必要):如果配置没生效,重启容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart mysql8.0.25
</code></span></span></span></span>
  • 1

2.2 Postgresql

版本:PostgreSQL 10.6 (Debian 10.6-1.pgdg90+1)

2.2.1 安装

Step1: 拉取 PostgreSQL 10.6 版本的镜像:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull postgres:10.6
</code></span></span></span></span>
  • 1

Step2:创建并启动 PostgreSQL 容器,在这里,我们将把容器的端口 5432 映射到主机的端口 30028,账号密码设置为postgres,并将 pgoutput 插件加载到 PostgreSQL 实例中:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30028</span>:5432 <span style="color:#ee9900">--name</span> postgres-10.6 <span style="color:#ee9900">-e</span> <span style="color:#ee9900">POSTGRES_PASSWORD</span><span style="color:#6272a4">=</span>postgres postgres:10.6 <span style="color:#ee9900">-c</span> <span style="color:#f1fa8c">'shared_preload_libraries=pgoutput'</span>
</code></span></span></span></span>
  • 1

Step3: 查看容器是否创建成功:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> <span style="color:#6272a4">|</span> <span style="color:#8be9fd">grep</span> postgres-10.6
</code></span></span></span></span>
  • 1

2.2.2 CDC 配置

Step1:docker进去Postgresql数据的容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> postgres-10.6  <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>
  • 1

Step2:编辑postgresql.conf配置文件:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">vi</span> /var/lib/postgresql/data/postgresql.conf 
</code></span></span></span></span>
  • 1

配置内容如下:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-py"><span style="color:#6272a4"># 更改wal日志方式为logical(方式有:minimal、replica 、logical  )</span>
wal_level <span style="color:#6272a4">=</span> logical  

<span style="color:#6272a4"># 更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots</span>
max_replication_slots <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>

<span style="color:#6272a4"># 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样</span>
max_wal_senders <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">20</span>     

<span style="color:#6272a4"># 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s,0表示禁用)</span>
wal_sender_timeout <span style="color:#6272a4">=</span> 180s	          
</code></span></span></span></span>

Step3:重启容器:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> restart postgres-10.6
</code></span></span></span></span>
  • 1

连接数据库,如果查询一下语句,返回logical表示修改成功:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">SHOW</span> wal_level<span style="color:#999999">;</span>
</code></span></span></span></span>
  • 1

Step4:新建用户并赋权。使用创建容器时的账号密码(postgres/postgres)登录Postgresql数据库。

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库 test_db</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> test_db<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 连接到新创建的数据库 test_db</span>
\c test_db

<span style="color:#6272a4">-- 创建 t_user 表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#f1fa8c">"public"</span><span style="color:#999999">.</span><span style="color:#f1fa8c">"t_user"</span> <span style="color:#999999">(</span>
    <span style="color:#f1fa8c">"id"</span> int8 <span style="color:#6272a4">NOT</span> <span style="color:#8be9fd">NULL</span><span style="color:#999999">,</span>
    <span style="color:#f1fa8c">"name"</span> <span style="color:#ff79c6">varchar</span><span style="color:#999999">(</span><span style="color:#f1fa8c">255</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    <span style="color:#f1fa8c">"age"</span> int2<span style="color:#999999">,</span>
    <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#f1fa8c">"id"</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- pg新建用户</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> test1 <span style="color:#ff79c6">WITH</span> PASSWORD <span style="color:#f1fa8c">'test123'</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 给用户复制流权限</span>
<span style="color:#ff79c6">ALTER</span> ROLE test1 <span style="color:#ff79c6">replication</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 给用户登录数据库权限</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CONNECT</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">DATABASE</span> test_db <span style="color:#ff79c6">to</span> test1<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 把当前库public下所有表查询权限赋给用户</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">PRIVILEGES</span> <span style="color:#ff79c6">ON</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span> <span style="color:#6272a4">IN</span> <span style="color:#ff79c6">SCHEMA</span> <span style="color:#ff79c6">public</span> <span style="color:#ff79c6">TO</span> test1<span style="color:#999999">;</span>

</code></span></span></span></span>

Step4:发布表:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置发布为true</span>
<span style="color:#ff79c6">update</span> pg_publication <span style="color:#ff79c6">set</span> puballtables<span style="color:#6272a4">=</span><span style="color:#8be9fd">true</span> <span style="color:#ff79c6">where</span> pubname <span style="color:#6272a4">is</span> <span style="color:#6272a4">not</span> <span style="color:#8be9fd">null</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 把所有表进行发布</span>
<span style="color:#ff79c6">CREATE</span> PUBLICATION dbz_publication <span style="color:#ff79c6">FOR</span> <span style="color:#ff79c6">ALL</span> <span style="color:#ff79c6">TABLES</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查询哪些表已经发布</span>
<span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> pg_publication_tables<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 更改复制标识包含更新和删除之前值(目的是为了确保表 t_user 在实时同步过程中能够正确地捕获并同步更新和删除的数据变化。如果不执行这两条语句,那么 t_user 表的复制标识可能默认为 NOTHING,这可能导致实时同步时丢失更新和删除的数据行信息,从而影响同步的准确性)</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> t_user REPLICA <span style="color:#ff79c6">IDENTITY</span> <span style="color:#ff79c6">FULL</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查看复制标识(为f标识说明设置成功,f(表示 full),否则为 n(表示 nothing),即复制标识未设置)</span>
<span style="color:#ff79c6">select</span> relreplident <span style="color:#ff79c6">from</span> pg_class <span style="color:#ff79c6">where</span> relname<span style="color:#6272a4">=</span><span style="color:#f1fa8c">'t_user'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

2.3 Oracle

版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

2.3.1 安装

Step1:拉取 oracle 11g 镜像(有6g,要等较长的时间)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>
  • 1

Step2:执行以下命令以创建并运行 Oracle 11g 容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-d</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30026</span>:1521 <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">8081</span>:8080 <span style="color:#999999">\</span>
<span style="color:#ee9900">--name</span> oracle_11g <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_HOME</span><span style="color:#6272a4">=</span>/home/oracle/app/oracle/product/11.2.0/dbhome_2 <span style="color:#999999">\</span>
<span style="color:#ee9900">-e</span> <span style="color:#ee9900">ORACLE_SID</span><span style="color:#6272a4">=</span>helowin <span style="color:#999999">\</span>
registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
</code></span></span></span></span>

Step3:查看容器是否启动

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#8be9fd">ps</span> -a<span style="color:#6272a4">|</span><span style="color:#8be9fd">grep</span> oracle_11g
</code></span></span></span></span>

Step4:进入容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>

**Step5:**设置账号密码

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4"># 1. 切换至root用户(默认是oracle用户),密码为helowin</span>
<span style="color:#8be9fd">su</span> root

<span style="color:#6272a4"># 2. 创建软链接</span>
<span style="color:#8be9fd">ln</span> <span style="color:#ee9900">-s</span> <span style="color:#ee9900">$ORACLE_HOME</span>/bin/sqlplus /usr/bin

<span style="color:#6272a4"># 3.切换回oracle用户</span>
<span style="color:#8be9fd">su</span> oracle

<span style="color:#6272a4"># 4. 登录sql plus</span>
sqlplus /nolog
conn /as sysdba
<span style="color:#6272a4">## 4.1 修改system用户密码为system</span>
alter user system identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.2 修改sys用户密码为system</span>
alter user sys identified by system<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.3 新增一个测试用户(用户名:test,密码:test123);</span>
create user <span style="color:#f1fa8c">test</span> identified by test123<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.4 将dba权限给内部管理员账号和密码</span>
grant connect,resource,dba to <span style="color:#f1fa8c">test</span><span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.5 修改密码策略规则为:密码永不过期</span>
ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.6 修改数据库最大连接数;</span>
alter system <span style="color:#f1fa8c">set</span> <span style="color:#ee9900">processes</span><span style="color:#6272a4">=</span><span style="color:#f1fa8c">1000</span> <span style="color:#ee9900">scope</span><span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>
<span style="color:#6272a4">## 4.7 最后重启数据库;</span>
<span style="color:#8be9fd">shutdown</span> immediate<span style="color:#999999">;</span>
startup<span style="color:#999999">;</span>

<span style="color:#6272a4"># 5.退出</span>
<span style="color:#f1fa8c">exit</span>
</code></span></span></span></span>

2.3.2 CDC 配置

Step1:进入容器

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> oracle_11g <span style="color:#8be9fd">bash</span>
</code></span></span></span></span>

Step2:以DBA的权限登录数据库

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
</code></span></span></span></span>

Step3:启用日志归档

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 设置数据库恢复文件目标大小为10G</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest_size <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">10</span>G<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 设置数据库恢复文件目标路径</span>
<span style="color:#ff79c6">alter</span> system <span style="color:#ff79c6">set</span> db_recovery_file_dest <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0'</span> scope<span style="color:#6272a4">=</span>spfile<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 立即关闭数据库</span>
<span style="color:#ff79c6">shutdown</span> immediate<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 以mount模式启动数据库</span>
startup mount<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 启用数据库归档日志模式</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> archivelog<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 打开数据库,允许用户访问</span>
<span style="color:#ff79c6">alter</span> <span style="color:#ff79c6">database</span> <span style="color:#ff79c6">open</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step4:查看日志归档是否启用(如果显示“Archive Mode”表示已经启用)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql">archive log list<span style="color:#999999">;</span>
</code></span></span></span></span>
  • 1

Step5:创建表空间

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA
<span style="color:#6272a4">-- 创建一个名为"logminer_tbs"的表空间</span>
<span style="color:#6272a4">-- 指定表空间的数据文件路径为"/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf",其中"/home/oracle/app/oracle/product/11.2.0"是数据文件存储的目录,"logminer_tbs.dbf"是数据文件的文件名</span>
<span style="color:#6272a4">-- 设置表空间的初始大小为25MB</span>
<span style="color:#6272a4">-- 如果数据文件已经存在且可重用,将其重用,否则创建一个新的数据文件</span>
<span style="color:#6272a4">-- 启用表空间的自动扩展功能,即当表空间空间不足时,自动增加数据文件的大小</span>
<span style="color:#6272a4">-- 设置表空间的最大允许大小为无限,即表空间可以无限制地自动扩展</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLESPACE</span> logminer_tbs DATAFILE <span style="color:#f1fa8c">'/home/oracle/app/oracle/product/11.2.0/logminer_tbs.dbf'</span> SIZE <span style="color:#f1fa8c">25</span>M REUSE AUTOEXTEND <span style="color:#ff79c6">ON</span> MAXSIZE UNLIMITED<span style="color:#999999">;</span>
</code></span></span></span></span>

Step6:创建用户并赋权

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建一个名为"flinkuser"的用户,密码为"flinkpw",将其默认表空间设置为"LOGMINER_TBS",并在该表空间上设置无限配额。</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">USER</span> flinkuser IDENTIFIED <span style="color:#ff79c6">BY</span> flinkpw <span style="color:#ff79c6">DEFAULT</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS QUOTA UNLIMITED <span style="color:#ff79c6">ON</span> LOGMINER_TBS<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建会话,即允许该用户连接到数据库。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">SESSION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户在多租户数据库(CDB)中设置容器。</span>
<span style="color:#6272a4">-- GRANT SET CONTAINER TO flinkuser;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$DATABASE视图,该视图包含有关数据库实例的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$<span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行任何表的闪回操作。</span>
<span style="color:#ff79c6">GRANT</span> FLASHBACK <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何表的数据。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户拥有SELECT_CATALOG_ROLE角色,该角色允许查询数据字典和元数据。</span>
<span style="color:#ff79c6">GRANT</span> SELECT_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户拥有EXECUTE_CATALOG_ROLE角色,该角色允许执行一些数据字典中的过程和函数。</span>
<span style="color:#ff79c6">GRANT</span> EXECUTE_CATALOG_ROLE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询任何事务。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TRANSACTION</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- (不支持Oracle 11g)允许"flinkuser"用户进行数据变更追踪(LogMiner)。</span>
<span style="color:#6272a4">-- GRANT LOGMINING TO flinkuser;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户锁定任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">LOCK</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户修改任何表。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">ANY</span> <span style="color:#ff79c6">TABLE</span> <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户创建序列。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">CREATE</span> SEQUENCE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户执行DBMS_LOGMNR_D包中的过程。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">EXECUTE</span> <span style="color:#ff79c6">ON</span> DBMS_LOGMNR_D <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOG_HISTORY视图,该视图包含有关数据库历史日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOG_HISTORY <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_LOGS视图,该视图包含有关LogMiner日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_LOGS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_CONTENTS视图,该视图包含LogMiner日志文件的内容。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_CONTENTS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGMNR_PARAMETERS视图,该视图包含有关LogMiner的参数信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGMNR_PARAMETERS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$LOGFILE视图,该视图包含有关数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$LOGFILE <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVED_LOG视图,该视图包含已归档的数据库日志文件的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVED_LOG <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 允许"flinkuser"用户查询V_$ARCHIVE_DEST_STATUS视图,该视图包含有关归档目标状态的信息。</span>
<span style="color:#ff79c6">GRANT</span> <span style="color:#ff79c6">SELECT</span> <span style="color:#ff79c6">ON</span> V_$ARCHIVE_DEST_STATUS <span style="color:#ff79c6">TO</span> flinkuser<span style="color:#999999">;</span>
</code></span></span></span></span>

Step7:数据库和表启用增量日志

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 切换至flinkuser用户</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> flinkuser<span style="color:#6272a4">/</span>flinkpw

<span style="color:#6272a4">-- 创建customers表</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> customers <span style="color:#999999">(</span>
    customer_id NUMBER <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span><span style="color:#999999">,</span>
    customer_name VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">50</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    email VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">100</span><span style="color:#999999">)</span><span style="color:#999999">,</span>
    phone VARCHAR2<span style="color:#999999">(</span><span style="color:#f1fa8c">20</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span> <span style="color:#ff79c6">TABLESPACE</span> LOGMINER_TBS<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 查看LOGMINER_TBS表空间下的所有表</span>
<span style="color:#ff79c6">select</span> tablespace_name<span style="color:#999999">,</span> table_name <span style="color:#ff79c6">from</span> user_tables
<span style="color:#ff79c6">where</span> tablespace_name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'LOGMINER_TBS'</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- 以DBA的权限登录数据库</span>
sqlplus <span style="color:#6272a4">/</span>nolog
<span style="color:#ff79c6">CONNECT</span> sys<span style="color:#6272a4">/</span>system <span style="color:#ff79c6">AS</span> SYSDBA

<span style="color:#6272a4">-- 为LOGMINER_TBS表空间下的customers表启用增强日志记录</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">TABLE</span> FLINKUSER<span style="color:#999999">.</span>CUSTOMERS <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span> <span style="color:#999999">(</span><span style="color:#ff79c6">ALL</span><span style="color:#999999">)</span> <span style="color:#ff79c6">COLUMNS</span>

<span style="color:#6272a4">-- 为数据库启用增强日志记录:</span>
<span style="color:#ff79c6">ALTER</span> <span style="color:#ff79c6">DATABASE</span> <span style="color:#ff79c6">ADD</span> SUPPLEMENTAL LOG <span style="color:#ff79c6">DATA</span><span style="color:#999999">;</span>
</code></span></span></span></span>

2.4 SQLServer

版本:Microsoft SQL Server 2019 (RTM-CU21) (KB5025808) - 15.0.4316.3 (X64)

2.4.1 安装

Step1:拉取SQL Server 2019 镜像

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> pull mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>

Step2:运行 SQL Server 容器(密码必须是8个字符,并包含字母、数字和特殊字符,如:abc@123456 ,下面映射主机端口为30027)

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#8be9fd">docker</span> run <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'ACCEPT_EULA=Y'</span> <span style="color:#ee9900">-e</span> <span style="color:#f1fa8c">'SA_PASSWORD=abc@123456'</span> <span style="color:#ee9900">-p</span> <span style="color:#f1fa8c">30027</span>:1433 <span style="color:#ee9900">--name</span> sql_server_2019 <span style="color:#ee9900">-d</span> mcr.microsoft.com/mssql/server:2019-latest
</code></span></span></span></span>

Step3:验证 SQL Server 容器是否正在运行

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code>docker ps -a|grep sql_server_2019
</code></span></span></span></span>

2.4.2 CDC 配置

Step1:开启SQLServer代理

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell"><span style="color:#6272a4">## 使用root用户登录容器</span>
<span style="color:#8be9fd">docker</span> <span style="color:#f1fa8c">exec</span> <span style="color:#ee9900">-it</span> <span style="color:#ee9900">--user</span> root sql_server_2019 <span style="color:#8be9fd">bash</span>

<span style="color:#6272a4">## 进入容器后,执行命令启用Agent</span>
/opt/mssql/bin/mssql-conf <span style="color:#f1fa8c">set</span> sqlagent.enabled <span style="color:#8be9fd">true</span>

<span style="color:#6272a4">## 退出,重启容器</span>
<span style="color:#f1fa8c">exit</span>
<span style="color:#8be9fd">docker</span> restart sql_server_2019
</code></span></span></span></span>

Step2:创建’cdc_test’测试数据库,并使用连接工具登录该数据库,使用以下 SQL 命令启用 CDC 功能

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建数据库</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">DATABASE</span> cdc_test<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 启用CDC功能</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_db<span style="color:#999999">;</span>

<span style="color:#6272a4">-- 判断当前数据库是否启用了CDC(如果返回1,表示已启用)</span>
<span style="color:#ff79c6">SELECT</span> is_cdc_enabled <span style="color:#ff79c6">FROM</span> sys<span style="color:#999999">.</span><span style="color:#ff79c6">databases</span> <span style="color:#ff79c6">WHERE</span> name <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

Step3:选择要进行 CDC 跟踪的表(这里使用orders表作为演示

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#6272a4">-- 创建示例表(orders)</span>
<span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> orders <span style="color:#999999">(</span>
     id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     order_date <span style="color:#ff79c6">date</span><span style="color:#999999">,</span>
     purchaser <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     quantity <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     product_id <span style="color:#ff79c6">int</span><span style="color:#999999">,</span>
     <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span><span style="color:#999999">[</span>id<span style="color:#999999">]</span><span style="color:#999999">)</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>

<span style="color:#6272a4">-- schema_name 是表所属的架构(schema)的名称。</span>
<span style="color:#6272a4">-- table_name 是要启用 CDC 跟踪的表的名称。</span>
<span style="color:#6272a4">-- cdc_role 是 CDC 使用的角色的名称。如果没有指定角色名称,系统将创建一个默认角色。</span>
<span style="color:#ff79c6">EXEC</span> sys<span style="color:#999999">.</span>sp_cdc_enable_table
  <span style="color:#ee9900">@source_schema</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
  <span style="color:#ee9900">@source_name</span>   <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span><span style="color:#999999">,</span>
  <span style="color:#ee9900">@role_name</span>     <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_role'</span><span style="color:#999999">;</span>
</code></span></span></span></span>

3. 验证

如果要验证flink cdc的功能,需要先下载flink的安装包,然后下载相应的cdc jar包并依赖,最后使用安装包里面的sql-client写相关的flink sql即可验证。

3.1 Flink版本与CDC版本的对应关系

下载Flink安装包以及jar包前,必须确定Flink CDC与Flink版本关系:

Flink CDC 版本 Flink 版本
1.0.0 1.11.*
1.1.0 1.11.*
1.2.0 1.12.*
1.3.0 1.12.*
1.4.0 1.13.*
2.0.* 1.13.*
2.1.* 1.13.*
2.2.* 1.13.*1.14.*
2.3.* 1.13.*1.14.*1.15.*1.16.0
2.4.* 1.13.*1.14.*1.15.*1.16.*1.17.0

本文以 Flink1.13.6 + Flink CDC 2.2.0 版本为例子演示。

3.2 下载相关包

flink 安装包下载,下载地址:Downloads | Apache Flink
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

下载cdc相关的jar,根据自己的需求,下载相关的cdc jar:Central Repository: com/ververica
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

3.3 添加cdc jar 至lib目录

把需要验证的cdc jar放到flink安装包解压之后的lib目录(<FLINK_HOME>/lib/):
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

3.4 验证

使用下面的命令启动 Flink 集群:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/start-cluster.sh
</code></span></span></span></span>

启动成功,可以访问 http://localhost:8081 访问到 Flink Web UI:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
使用下面的命令启动 Flink SQL CLI :

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./bin/sql-client.sh
</code></span></span></span></span>

展示如下页面,表示启动flink客户端成功:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
执行如下FlinkSQL:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">CREATE</span> <span style="color:#ff79c6">TABLE</span> t_source_sqlserver <span style="color:#999999">(</span>
   id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    order_date <span style="color:#ff79c6">DATE</span><span style="color:#999999">,</span>
    purchaser <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    quantity <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    product_id <span style="color:#ff79c6">INT</span><span style="color:#999999">,</span>
    <span style="color:#ff79c6">PRIMARY</span> <span style="color:#ff79c6">KEY</span> <span style="color:#999999">(</span>id<span style="color:#999999">)</span> <span style="color:#6272a4">NOT</span> ENFORCED
<span style="color:#999999">)</span> <span style="color:#ff79c6">WITH</span> <span style="color:#999999">(</span>
  <span style="color:#f1fa8c">'connector'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sqlserver-cdc'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'hostname'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'10.194.183.120'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'port'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'30027'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'username'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'sa'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'password'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'abc@123456'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'database-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'cdc_test'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'schema-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'dbo'</span><span style="color:#999999">,</span>
  <span style="color:#f1fa8c">'table-name'</span> <span style="color:#6272a4">=</span> <span style="color:#f1fa8c">'orders'</span>
<span style="color:#999999">)</span><span style="color:#999999">;</span>
</code></span></span></span></span>

可以看到执行成功了:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
执行select 语句,以便实时查看该表的数据变动:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-sql"><span style="color:#ff79c6">select</span> <span style="color:#6272a4">*</span> <span style="color:#ff79c6">from</span> t_source_sqlserver<span style="color:#999999">;</span>
</code></span></span></span></span>

从下图,可以看出,只要修改左边的数据,会在控制台实时显示新增删除的数据。
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据
同时,也能在Flink web页面看到任务正在运行:
flink cdc多种数据源安装、配置与验证,Flink-cdc,flink,大数据

最后,可以通过如下命令关闭掉Flink启动的集群:

<span style="color:rgba(0, 0, 0, 0.75)"><span style="background-color:#ffffff"><span style="color:#000000"><span style="background-color:#282a36"><code class="language-shell">./stop-cluster.sh
</code></span></span></span></span>

到了这里,关于flink cdc多种数据源安装、配置与验证 flink cdc多种数据源安装、配置与验证的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Doris1.1.1多种异构数据源数据导入方案

            Apache Doris 是一个基于 MPP 架构的高性能、实时的分析型数据库,以极速易用的特点被人们所熟知,仅需亚秒级响应时间即可返回海量数据下的查询结果,不仅可以支持高并发的点查询场景,也能支持高吞吐的复杂分析场景。基于此,Apache Doris 能够较好的满足报表

    2024年02月03日
    浏览(37)
  • Flink 定时加载数据源

    一、简介 flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单 如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现 二、pom.xml 文件 注意 flink 好多包

    2024年02月05日
    浏览(27)
  • Flink学习之旅:(三)Flink源算子(数据源)

            Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。 数据集合 数据文件 Socket数据 kafka数据 自定义Source         创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用) 运行结果

    2024年02月06日
    浏览(38)
  • 美国Linux服务器安装Grafana和配置zabbix数据源的教程

    美国Linux服务器的Grafana工具是跨平台、开源、时序和可视化面板Dashboard监控平台工具,是在日常管理中帮忙提高效率的实用工具,可以通过将采集的美国Linux服务器系统数据查询后,进行可视化的展示及通知,本文小编就来介绍下美国Linux服务器安装Grafana工具和配置zabbix数据

    2024年02月13日
    浏览(39)
  • docker安装nacos配置外部数据源mysql,解决no DataSource set 问题

    1.之前一直看的别人的nacos配置教程,都感觉不全面。同时启动时莫名会出现no datasource set问题,前两天为了看了各方面的教程,也写了 nacos docker v2.1.2启动报错数据源未设置no datasource set文章。 2.昨天nacos突然宕机了,试了各种方法,还是no datasource set;现在综合各方面的教程

    2024年02月02日
    浏览(59)
  • flink重温笔记(二):Flink 流批一体 API 开发——Source 数据源操作

    前言:今天是第二天啦!开始学习 Flink 流批一体化开发知识点,重点学习了各类数据源的导入操作,我发现学习编程需要分类记忆,一次一次地猜想 api 作用,然后通过敲代码印证自己的想法,以此理解知识点,加深对api的理解和应用。 Tips:我觉得学习 Flink 还是挺有意思的

    2024年02月19日
    浏览(32)
  • flink如何初始化kafka数据源的消费偏移

    我们知道在日常非flink场景中消费kafka主题时,我们只要指定了消费者组,下次程序重新消费时是可以从上次消费停止时的消费偏移开始继续消费的,这得益于kafka的_offset_主题保存的关于消费者组和topic偏移位置的具体偏移信息,那么flink应用中重启flink应用时,flink是从topic的什

    2024年02月16日
    浏览(39)
  • flink执行环境和读取kafka以及自定义数据源操作

    目录 创建执行环境 1. getExecutionEnvironment 2. createLocalEnvironment 3. createRemoteEnvironment  执行模式(Execution Mode) 1. BATCH 模式的配置方法 2. 什么时候选择 BATCH 模式 触发程序执行 数据源操作 读取kafka数据源操作  自定义Source           编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执

    2023年04月10日
    浏览(29)
  • flink cep数据源keyby union后 keybe失效

    问题背景:cep模板 对数据源设置分组条件后,告警的数据,和分组条件对不上, 掺杂了,其他的不同组的数据,产生了告警 策略条件: 选择了两个kafka的的topic的数据作为数据源, 对A 数据源 test-topic1, 进行条件过滤, 过滤条件为:login_type  = 1 对B 数据源 test-topic2,进行

    2024年02月01日
    浏览(27)
  • Flink读取数据的5种方式(文件,Socket,Kafka,MySQL,自定义数据源)

    这是最简单的数据读取方式。当需要进行功能测试时,可以将数据保存在文件中,读取后验证流处理的逻辑是否符合预期。 程序代码: 输出结果 用于验证一些通过Socket传输数据的场景非常方便。 程序代码: 测试时,需要先在 172.16.3.6 的服务器上启动 nc ,然后再启动Flink读

    2024年02月16日
    浏览(34)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包