大数据技术-FlinkCDC学习笔记

这篇具有很好参考价值的文章主要介绍了大数据技术-FlinkCDC学习笔记。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Flink-CDC简介

官网地址 https://ververica.github.io/flink-cdc-connectors/master/

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的 变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC主要分为基于查询和基于Binlog两种方式

基于查询的CDC 基于Binlog的CDC
开源产品 Sqoop、Kafka JDBC Source Canal、MAxwell、Debezium
执行模式 Batch批处理 Streaming流处理
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

MySQL开启binlog日志文件

修改 /etc/my.cnf

log-bin=mysql-bin
binlog_format=row
binlog-do-db=bigdata-realtime

大数据技术-FlinkCDC学习笔记

DataStream方式应用

引入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.2.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

打包方式:全量包

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

监控数据库变化

public static void main(String[] args) throws Exception {
    // 1. 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序

    // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
    env.enableCheckpointing(5000L);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
    System.setProperty("HADOOP_USER_NAME", "root");
    env.getCheckpointConfig().setCheckpointTimeout(10000L);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);

    // 2. 通过FlinkCDC构建SourceFunction并读取数据
    DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .hostname("hadoop102")
            .port(3306)
            .username("root")
            .password("root")
            .databaseList("gmall-210325-flink")
            .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
            .deserializer(new StringDebeziumDeserializationSchema())
            .startupOptions(StartupOptions.initial())
            .build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    // 3. 打印数据
    streamSource.print();
    // 4. 启动任务
    env.execute("FlinkCDC");
}

启动任务

bin/flink run -m hadoop102:8081 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar

大数据技术-FlinkCDC学习笔记

保存savepoint

# bin/flink savepoint 任务id hdfs地址
bin/flink savepoint eaebb93839f0c66014b34a9bf21b4cfa hdfs://hadoop102:8020/gmall-210325/sv

从savepoint处启动

bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/gmall-210325/sv/savepoint-eaebb9-0759b72fba40 -c com.atguigu.FlinkCDC ./gmall-flink-cdc.jar

大数据技术-FlinkCDC学习笔记

FlinkSQL方式应用

引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

监控数据库变化

public static void main(String[] args) throws Exception {
    // 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    // DDL方式建表
    tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
            " id STRING NOT NULL, " +
            " tm_name STRING, " +
            " logo_url STRING " +
            ") WITH ( " +
            " 'connector' = 'mysql-cdc', " +
            " 'hostname' = 'hadoop102', " +
            " 'port' = '3306', " +
            " 'username' = 'root', " +
            " 'password' = 'root', " +
            " 'database-name' = 'gmall-210325-flink', " +
            " 'table-name' = 'base_trademark' " +
            ")");
    // 查询数据
    Table table = tableEnv.sqlQuery("select * from mysql_binlog");
    // 将动态表转换为流
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
    retractStream.print();
    // 启动任务
    env.execute("FlinkCDCWithSQL");
}

自定义反序列化器

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

    // 封装的数据格式 json
    /*
        {
            "database":"",
            "tableName":"",
            "type":"",
            "before":"json",
            "after":"json",
        }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 创建JSON对线用于存储最终数据
        JSONObject result = new JSONObject();
        // 获取库名和表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (int i = 0; i < beforeFields.size(); i++) {
                Field field = beforeFields.get(i);
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        // 获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if("create".equals(type)){
            type="insert";
        }
        // 将字段写入JSON对线
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        // 输出数据
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

使用自定义序列化器

public static void main(String[] args) throws Exception {
    // 1. 获取执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // Flink-cdc将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断电续传,需要从CheckPoint或者SavePoint启动程序

    // 开启CK,每隔5秒钟做一次CK,CK超时时间10s,最大并发数2,CK之间间隔1,访问HDFS的用户名root,重启策略延时2秒重启3次
    env.enableCheckpointing(5000L);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-210325/ck"));
    System.setProperty("HADOOP_USER_NAME", "root");
    env.getCheckpointConfig().setCheckpointTimeout(10000L);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1L);

    // 2. 通过FlinkCDC构建SourceFunction并读取数据
    DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .hostname("hadoop102")
            .port(3306)
            .username("root")
            .password("root")
            .databaseList("gmall-210325-flink")
            .tableList("gmall-210325-flink.base_trademark") // 如果不添加该参数,则消费指定数据库中所有表的数据,如果指定,指定方式为db.table,必须带上库名
            .deserializer(new CustomerDeserialization())
            .startupOptions(StartupOptions.initial())
            .build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    // 3. 打印数据
    streamSource.print();
    // 4. 启动任务
    env.execute("FjavalinkCDCWithCustomerDeserialization");
}

Flink-SQLClient方式应用

同步Kafka

所需依赖

flink-sql-connector-kafka_2.11-1.13.6.jar

客户端设置

# 设置CheckPoint时间
set execution.checkpointing.interval = 3s;
# 设置方便查询
set sql-client.execution.result-mode=tableau;

创建FlinkCDC表

CREATE TABLE cdc_source (
id bigint,
tm_name STRING,
logo_url STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'bigdata',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'mall',
'table-name' = 'base_trademark'
);

创建Kafka表

对于upsert-kafka,新增则是配置的json格式数据,删除则是null

CREATE TABLE kafka_sink (
`id` BIGINT,
`name` STRING,
`pwd` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test1',
'properties.bootstrap.servers' = 'bigdata:9092',
'key.format' = 'json',
'value.format' = 'json'
);

对于kafka,则是debezium格式

CREATE TABLE kafka_sink (
`id` BIGINT,
`tm_name` STRING,
`logo_url` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test1',
'properties.bootstrap.servers' = '192.168.80.100:9092',
'properties.group.id' = 'test1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',
'debezium-json.ignore-parse-errors'='true'
);

插入数据

insert into kafka_sink select * from cdc_source;

同步MySQL

所需依赖

flink-sql-connector-mysql-cdc-2.2.1.jar

客户端设置

# 设置CheckPoint时间
set execution.checkpointing.interval = 3s;
# 设置方便查询
set sql-client.execution.result-mode=tableau;

CDC表

CREATE TABLE db1 (
id INT,
name STRING,
pwd STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'bigdata',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'db1',
'table-name' = 'user');

同步Oracle

官网参考地址

cdc-oracle依赖官网下载地址:https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/

使用docker安装oracle:https://blog.csdn.net/qq_36039236/article/details/124224500?spm=1001.2014.3001.5502

cdc-oracle官网地址:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html

所需依赖

flink-sql-connector-oracle-cdc-2.2.1.jar

Oracle日志支持准备

# 修改STUDENT_INFO表让其支持增量日志,这句先在Oracle里创建user表再执行
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE CZS.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

客户端设置

# 设置CheckPoint时间
set execution.checkpointing.interval = 3s;
# 设置方便查询
set sql-client.execution.result-mode=tableau;

创建Oracle表

create table student_info 
( 
sid number(10) constraint pk_sid primary key, 
sname varchar2(10), 
sex varchar2(2)
);

CDC表

CREATE TABLE test (
SID INT NOT NULL,
SNAME STRING,
SEX STRING,
PRIMARY KEY(SID) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'bigdata',
'port' = '1521',
'username' = 'czs',
'password' = 'czs',
'database-name' = 'helowin',
'schema-name' = 'czs',
'table-name' = 'student_info',
# 或略表名大小写
'debezium.database.tablename.case.insensitive'='false'
);

同步PG

官网参考地址

postgres-cdc官网地址:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html

所需依赖

flink-sql-connector-postgres-cdc-2.2.1.jar

环境准备

dockers启动pg

# 拉取镜像
docker pull postgres

# 创建容器
docker run --name postgres -d -p 5432:5432 -v /mydata/postgresql/data:/var/lib/postgresql/data/ -e POSTGRES_DB=postgres -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres

# 进入pg
docker exec -it postgres psql -U postgres -d postgres

# 进入psql
psql -U postgres -d postgres

所需权限

-- pg新建用户
CREATE USER mtcs WITH PASSWORD 'mtcs';

-- 给用户复制流权限
ALTER USER mtcs REPLICATION;

-- 给用户登录数据库权限
grant CONNECT ON DATABASE postgres to mtcs;

-- 把当前库public下所有表查询权限赋给用户
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mtcs;

-- 设置发布为true
update pg_publication set puballtables=true where pubname is not null;

-- 把所有表进行发布
CREATE PUBLICATION dbz_publication FOR ALL TABLES;

-- 查询哪些表已经发布
select * from pg_publication_tables;

创建postgres-cdc

CREATE TABLE postgres (
id INT,
name STRING,
pwd STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'bigdata',
'port' = '5432',
'username' = 'czs',
'password' = 'czs',
'database-name' = 'hive',
'schema-name' = 'public',
'table-name' = 'user',
'debezium.plugin.name' = 'pgoutput'
);

同步MongoDB

mongodb快速入门:https://www.runoob.com/mongodb/

cdc官网说明地址:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html

基于docker创建mongodb副本集:https://blog.csdn.net/weixin_44975592/article/details/125112407

cdc-mongo依赖官网下载地址:https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/

所需依赖

flink-sql-connector-mongodb-cdc-2.2.1.jar

环境准备

docker启动mongodb

# docker启动mongo
# -p 主机端口号:docker端口号
# –n me mongodb名称
# -v 主机路径:docker路径
# -replSet rs(集群名称,1主2从便是一个集群)
docker run -p 27017:27017 --name mongo -v /root/data/db:/data/db -d mongo --replSet rs

# 进入mongo
docker exec -it mongo mongo

# 添加从节点并初始化,这是初始化,直接初始化可以直接将当前节点变为主节点,然后可以在通过rs.add()添加,但是要注意,添加成功后,查看时会看到展示的是一个数组,数组中是一个节点一个集合,集合中有元素,元素有_id,这个_id的值必须是顺序下来,且主节点应该是0,是不是必须并不清楚。
# 看到"ok":1为成功,0为失败,如果你为0,看看上面有无操作出错
rs.initiate()

# 十秒左右,按回车,可看到变为主节点
rs: primary>

# 查看状态
rs.status()


# 创建admin用户 需要use admin在admin数据库下创建
db.createUser({ user:'admin',pwd:'admin',roles:[ { role:'userAdminAnyDatabase', db: 'admin'},"readWriteAnyDatabase"]});

# 使用admin认证
db.auth('admin', 'admin')

# czs用户是官网说明的cdc同步权限,需要use admin在admin数据库下创建
db.createUser({
  user: "czs",
  pwd: "czs",
  roles: [
    { role: "read", db: "admin" }, 
    { role: "readAnyDatabase", db: "admin" }
  ]
});

创建CDC-MongoDB表

# 创建数据库
use runoob

# mongo中插入数据
db.runoob.insert({"name":"czs"})

# 查看所有数据库
show dbs

# mongo中查询数据
db.runoob.find()

# 创建mongo-flinkcdc表
CREATE TABLE runoob (
  _id STRING,
  name STRING,
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc',
  'hosts' = 'bigdata:27017',
  'username' = 'czs',
  'password' = 'czs',
  'database' = 'runoob'
);

# 验证cdc同步
select * from runoob; 

操作Hive

所需依赖

flink-sql-connector-hive-3.1.2_2.11-1.13.6.jar

flink-shaded-hadoop3-uber-blink-3.3.0.jar、flink-shaded-hadoop-2-uber-2.8.3-10.0.jar(只需要一个即可)

使用Orc格式的Hive表

  • orc表:https://blog.csdn.net/qq_35260875/article/details/109196553
  • 官网hive依赖地址:https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-3.1.2_2.11/1.13.6
  • 官网flink hive依赖说明:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/overview/
set sql-client.execution.result-mode=tableau;

# 创建目录
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

# 创建orc格式的hive表
create table test(
     a     int,
     b     int,
     c     int,
     d     int
)
row format delimited fields terminated by ',' 
stored as orc tblproperties ("orc.compress"="NONE");

# 写入数据
insert into test values(1,2,3,4);

写入Hive的分区数据

set sql-client.execution.result-mode=tableau;

SET table.sql-dialect=hive;

# 创建目录
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/opt/module/hive-3.1.2/conf'
);

CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',   
  'sink.partition-commit.trigger'='partition-time',  
  'sink.partition-commit.delay'='1 h',   
  'sink.partition-commit.policy.kind'='metastore,success-file'  
);

SET table.sql-dialect=default;

insert into hive_table values('czs1',1.0,'part1','hr1');

同步Hive维表


CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/opt/module/hive-3.1.2/conf' );

SET table.sql-dialect=hive;
CREATE EXTERNAL TABLE hive_table_test (
  obuid STRING,
  ck_time STRING
) PARTITIONED BY (dt STRING) 
 STORED AS ORC 
 LOCATION  'hdfs://bigdata:8020/datas/hive_table_test'
  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',   
  'sink.partition-commit.trigger'='partition-time',  
  'sink.partition-commit.delay'='1 h',   
  'sink.partition-commit.policy.kind'='metastore,success-file'  
);

# 查看表信息
describe formatted hive_table_test;
describe hive_table_test;
show partitions hive_table_test;


SET table.sql-dialect=default;
create table kafka_bus_yiqi_rt_ck
(  
obuid               string   ,  
ck_time             string   ,  
proc_time as proctime()
)
WITH(
'connector.type' = 'kafka',  
'connector.topic' = 'kafka_bus',  
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',   
'connector.properties.group.id' = 'testGroup',  
'connector.version'='universal',  
'format.type' = 'json',   
'connector.startup-mode'='earliest-offset' 
);

炸裂函数使用

json格式如下

{
    "id":1238123899121,
    "name":"asdlkjasjkdla998y1122",
    "date":"1990-10-14",
    "obj":{
        "time1":"12:12:43Z",
        "str":"sfasfafs",
        "lg":2324342345
    },
    "arr":[
        {
            "f1":"f1str11",
            "f2":134
        },
        {
            "f1":"f1str22",
            "f2":555
        }
    ],
    "time":"12:12:43Z",
    "timestamp":"1990-10-14T12:12:43Z",
    "map":{
        "flink":123
    },
    "mapinmap":{
        "inner_map":{
            "key":234
        }
    }
}

创建flink表

CREATE TABLE json_source (
    id            BIGINT,
    name          STRING,
    `date`        DATE,
    obj           ROW<time1 TIME,str STRING,lg BIGINT>,
    arr           ARRAY<ROW<f1 STRING,f2 INT>>,
    `time`        TIME,
    `timestamp`   TIMESTAMP(3),
    `map`         MAP<STRING,BIGINT>,
    mapinmap      MAP<STRING,MAP<STRING,INT>>,
    proctime as PROCTIME()
 ) WITH (
    'connector.type' = 'kafka',  
    'connector.topic' = 'test',  
    'connector.properties.zookeeper.connect' = 'localhost:2181',
    'connector.properties.bootstrap.servers' = 'localhost:9092',   
    'connector.properties.group.id' = 'testGroup',  
    'connector.version'='universal',  
    'format.type' = 'json',   
    'connector.startup-mode'='latest-offset' 
);

插入验证

insert into json_source select 111 as id,'name' as name,Row(CURRENT_TIME,'ss',123) as obj,Array[Row('f',1),Row('s',2)] as arr,Map['k1','v1','k2','v2'] as `map`,Map['inner_map',Map['k','v']] as mapinmap;

查询验证文章来源地址https://www.toymoban.com/news/detail-404194.html

select id,name,f1,f2 from json_source, UNNEST(arr) AS t (f1,f2);

到了这里,关于大数据技术-FlinkCDC学习笔记的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink-cdc同步mysql数据到elasticsearch

    CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 cdc项目地址:https://github.com/ver

    2024年02月13日
    浏览(87)
  • 【开发问题】flink-cdc不用数据库之间的,不同类型的转化

    我一开始是flink-cdc,oracle2Mysql,sql 我一开始直接用的oracle【date】类型,mysql【date】类型,sql的校验通过了,但是真正操作数据的时候报错,告诉我oracle的数据格式的日期数据,不可以直接插入到mysql格式的日期数据,说白了就是数据格式不一致导致的 我想的是既然格式不对

    2024年02月12日
    浏览(47)
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

    最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka。 监听数据变化,进行异步通知,做系统内异步任务。 架构方案(懒得写了,看图吧): -- 创建pg 高线数据同步用

    2024年02月02日
    浏览(46)
  • Flink-CDC——MySQL、SqlSqlServer、Oracle、达梦等数据库开启日志方法

    目录 1. 前言 2. 数据源安装与配置 2.1 MySQL 2.1.1 安装 2.1.2 CDC 配置 2.2 Postgresql 2.2.1 安装 2.2.2 CDC 配置 2.3 Oracle 2.3.1 安装 2.3.2 CDC 配置 2.4 SQLServer 2.4.1 安装 2.4.2 CDC 配置 2.5达梦 2.4.1安装 2.4.2CDC配置 3. 验证 3.1 Flink版本与CDC版本的对应关系 3.2 下载相关包 3.3 添加cdc jar 至lib目录 3.4 验

    2024年02月05日
    浏览(59)
  • 【Flink-CDC】Flink CDC 介绍和原理概述

    CDC是( Change Data Capture 变更数据获取 )的简称。 核心思想是, 监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 CDC 主要分为基于查询和基于

    2024年01月20日
    浏览(46)
  • 大数据技术-FlinkCDC学习笔记

    官网地址 https://ververica.github.io/flink-cdc-connectors/master/ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的 变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消

    2023年04月08日
    浏览(20)
  • ApacheStreamPark2.1.0部署及执行flink-cdc任务同步mysql表的数据到es的实践

    ApacheStreamPark是流处理极速开发框架,流批一体 湖仓一体的云原生平台,一站式流处理计算平台。   特性中的简单易用和文档详尽这两点我也是深有体会的,部署一点都不简单,照着官方文档都不一定能搞出来,下面部署环节慢慢来吐槽吧。   之前我们写 Flink SQL 基本上

    2024年02月11日
    浏览(48)
  • flink-cdc,clickhouse写入,多路输出

    kafka日志数据从kafka读取 1、关联字典表:完善日志数据 2、判断日志内容级别:多路输出 低级:入clickhouse 高级:入clickhouse的同时推送到kafka供2次数据流程处理。

    2024年02月09日
    浏览(43)
  • Flinkx/Datax/Flink-CDC 优劣势对比

    Flinkx/Datax/Flink-CDC 优劣势对比_HiBoyljw的博客-CSDN博客        FlinkX是一款基于Flink的分布式离线/实时数据同步插件,可实现多种异构数据源高效的数据同步,其由袋鼠云于2016年初步研发完成,目前有稳定的研发团队持续维护,已在Github上开源(开源地址详见文章末尾),并维

    2024年02月07日
    浏览(50)
  • Flink-CDC Cannot instantiate the coordinator for operator Source

    在使用flink1.14.6版本cdc时出现报错: Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) ~[flink-dist_2.11-1.14.6.jar:1.14.6] at java.util.concurrent.CompletableFuture.uniWhenComp

    2024年02月12日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包