【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)

这篇具有很好参考价值的文章主要介绍了【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

需求描述:

1、数据从 Kafka 写入 Mongo。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

7、Mongo 使用 Document 进行封装操作。

8、此示例中通过 db.collection 传参的方式进行。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。文章来源地址https://www.toymoban.com/news/detail-834536.html

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gaei.cn.x5l</groupId>
    <artifactId>x8vbusiness</artifactId>
    <version>1.0.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>

        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.10</scala.version>
        <flink.version>1.14.0</flink.version>
        <log4j.version>2.17.2</log4j.version>
        <hadoop.version>3.1.2</hadoop.version>
        <hive.version>3.1.2</hive.version>

        <mongo.driver.version>3.12.6</mongo.driver.version>
        <mongo.driver.core.version>4.3.1</mongo.driver.core.version>

    </properties>
    <dependencies>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <exclusions>-->
            <!--                <exclusion>-->
            <!--                    <groupId>mysql</groupId>-->
            <!--                    <artifactId>mysql-connector-java</artifactId>-->
            <!--                </exclusion>-->
            <!--            </exclusions>-->
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- 基础依赖  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 基础依赖  结束-->
        <!-- TABLE  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- 使用 hive sql时注销,其他时候可以放开 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- TABLE  结束-->
        <!-- sql  开始-->
        <!-- sql解析 开始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- sql解析 结束 -->
        <!-- sql连接 kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- sql  结束-->
        <!-- 检查点 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
            <scope>compile</scope>
        </dependency>

        <!-- 本地监控任务 开始 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 本地监控任务 结束 -->
        <!-- DataStream 开始 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <!-- hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.avro</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!-- 重点,容易被忽略的jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>


        </dependency>
        <!-- rocksdb_2 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 其他 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.jyaml</groupId>
            <artifactId>jyaml</artifactId>
            <version>1.3</version>
        </dependency>


        <!-- TABLE  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <!--            <version>${flink.version}</version>-->
            <version>1.13.5</version>
            <scope>provided</scope>
        </dependency>


        <!-- TABLE  结束-->


        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.3</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mongodb-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <!--            <version>5.1.44</version>-->
            <version>8.0.27</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.8</version>
        </dependency>



        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>bson</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>


        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver-core</artifactId>
            <version>${mongo.driver.core.version}</version>
        </dependency>

        <!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongodb-driver</artifactId>
            <version>3.12.6</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                    <exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
                                </transformer>
                                <!-- flink sql 需要  -->
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>

            </plugins>
        </pluginManagement>

    </build>

</project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1

# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo

mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456

#envType=PRE
envType=PRD

# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <property name="LOG_LEVEL" value="ERROR" />
    </Properties>

    <appenders>
        <console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
        <File name="log" fileName="tmp/log/job.log" append="false">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
        </File>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="console"/>
            <appender-ref ref="log"/>
        </root>
    </loggers>
</configuration>

3)util

3.1.KafkaMongoUtils

public class KafkaUtils {
    public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumerForMongo(List<String> topic) throws IOException {
        Properties prop1 = confFromYaml();
        //认证环境
        String envType = prop1.getProperty("envType");


        Properties prop = new Properties();

        System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                + "useTicketCache=false  "
                + "serviceName=\"" + "kafka" + "\" "
                + "useKeyTab=true "
                + "keyTab=\"" + "/opt/conf/test.keytab" + "\" "
                + "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");

//        prop.put("bootstrap.servers", "kfk01.pre.x8v.com:9092");
        prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));
        prop.put("group.id", "Kafka2Mongo_all");
        prop.put("auto.offset.reset", "earliest");
        prop.put("enable.auto.commit", "false");
        prop.put("max.poll.interval.ms", "60000");
        prop.put("max.poll.records", "3000");
        prop.put("session.timeout.ms", "600000");

//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());

        prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");


        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);

        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);

        return consumer;
    }

    public static void main(String[] args) throws Exception {
        Properties druidConf = KafkaUtils.getDruidConf();
        if (druidConf == null) {
            throw new RuntimeException("缺少druid相关配置信息,请检查");
        }

        DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);
        Connection connection = dataSource.getConnection();
        PreparedStatement showDatabases = connection.prepareStatement("\n" +
                "select count(*) from tab_factory");
        ResultSet resultSet = showDatabases.executeQuery();
        while (resultSet.next()) {
            String string = resultSet.getString(1);
            System.out.println(string);
        }
        resultSet.close();
        showDatabases.close();

        connection.close();


    }

    public static Properties getDruidConf() {
        try {
            Properties prop = confFromYaml();
            String driverClassName = prop.get("druid.driverClassName").toString();
            String url = prop.get("druid.url").toString();
            String username = prop.get("druid.username").toString();
            String password = prop.get("druid.password").toString();
            String initialSize = prop.get("druid.initialSize").toString();
            String maxActive = prop.get("druid.maxActive").toString();
            String maxWait = prop.get("druid.maxWait").toString();

            Properties p = new Properties();
            p.put("driverClassName", driverClassName);
            p.put("url", url);
            p.put("username", username);
            p.put("password", password);
            p.put("initialSize", initialSize);
            p.put("maxActive", maxActive);
            p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));
            return p;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


    // envType     PRE  PRD
    public static Map<String, String> getKafkaKerberos(String envType) {
        Map<String, String> map = new HashMap<>();
        if ("PRD".equalsIgnoreCase(envType)) {
            map.put("principal", "prd@PRD.PRD.COM");
            map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");
        } else if ("PRE".equalsIgnoreCase(envType)) {
            map.put("principal", "pre@PRE.PRE.COM");
            map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");
        } /*else if ("TEST".equalsIgnoreCase(envType)) {
            map.put("principal","test@TEST.TEST.COM");
            map.put("bootstrap.servers","test@TEST.TEST.COM");
        } */ else {
            System.out.println("没有该" + envType + "环境");
            throw new RuntimeException("没有该" + envType + "环境");
        }

        return map;
    }

    public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {
        Properties prop = confFromYaml();
        env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));
        env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次
                Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时
        ));
        // 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));
        return env;

    }

    public static Properties confFromYaml() {
        Properties prop = new Properties();
        InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        try {
            prop.load(resourceStream);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (resourceStream != null) {
                    resourceStream.close();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        return prop;
    }
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
    private static String encoding = "UTF8";

    //是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来
    @Override
    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
        return false;
    }

    //这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        byte[] key = (record.key() == null ? "".getBytes() : record.key());
        return new ConsumerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*这里我没有进行空值判断,生产一定记得处理*/
                new  String(key, encoding),
                new  String(record.value(), encoding));
    }

    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
        });
    }
}

4)kafkacdc2mongo

4.1.Kafka2MongoApp

public class Kafka2MongoApp {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        String[] split = args[0].split(",");
        // String topic = "mongo_" + database + "_" + collection;
        List<String> topicList = new ArrayList<>();
        Map<String, String> dbAndCol = new HashMap<>();
        for (String s : split) {
            String[] t = s.split("\\.");
            String e = "mongo_" + t[0] + "_" + t[1];
            topicList.add(e);
            dbAndCol.put(e, s);
        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
        KafkaUtils.setupFlinkEnv(env);
        RichSinkFunction<ConsumerRecord<String, String>> sinkFunction = new RichSinkFunction<ConsumerRecord<String, String>>() {

            private Stack<MongoClient> connectionPool = new Stack<>();
            private String url = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                initPool();
            }

            /**
             * 初始化连接池,设置参数。
             */
            private void initPool() {
                Properties prop = KafkaUtils.confFromYaml();
                url = prop.getProperty("url");

                try {
                    for (int i = 0; i < 5; i++) {
                        MongoClient client = new MongoClient(new MongoClientURI(url));
                        connectionPool.push(client);
                    }
                } catch (MongoException e) {
                    e.printStackTrace();
                }
            }


            @Override
            public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
                MongoClient mongoClient = null;
                try {
                    String topic = record.topic();
                    String dbAndColstr = dbAndCol.get(topic);
                    String[] t = dbAndColstr.split("\\.");
                    String databaseStr = t[0];
                    String collectionStr = t[1];
                    Document doc = Document.parse(record.value());
                    String operationType = doc.getString("operationType");
                    String documentKey = doc.getString("documentKey");
                    Object id = documentKey;
                    id = doc.get("documentKey");

                    // 从连接池获取连接
                    mongoClient = connectionPool.pop();

                    MongoCollection<Document> collection = null;
                    try {
                        collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
                    } catch (Exception e) {
                        try {
                            mongoClient.close();
                        } catch (Exception ignore) {
                        }
                        // 链接过期
                        mongoClient = new MongoClient(new MongoClientURI(url));
                        collection = mongoClient.getDatabase(databaseStr).getCollection(collectionStr);
                    }

                    if ("delete".equalsIgnoreCase(operationType)) {
                        collection.deleteOne(new Document("id", id));
                    }

                    if (documentKey != null && !documentKey.isEmpty() && !"delete".equals(operationType)) {
                        Document outputDoc = (Document) doc.get("fullDocument");
                        outputDoc.put("id", id);
                        try {
                            collection.deleteOne(new Document("id", id));
                        } catch (Exception e) {
                            System.out.println("添加更新前先删除:异常信息====>>>" + e.getMessage() + "插入的数据是\n" + outputDoc);
                        }
                        if ("insert".equalsIgnoreCase(operationType) || "update".equalsIgnoreCase(operationType) || "replace".equalsIgnoreCase(operationType)) {
                            insertOne(collection, outputDoc);
                        }
                    }
                } catch (Exception e) {
                    System.out.printf("mongodb 同步异常,原因是%s,topic是%s,value值是\n%s%n", e.getMessage(), record.topic(), record.value());
                } finally {
                    if (mongoClient != null) {
                        // 把连接放回连接池
                        connectionPool.push(mongoClient);
                    }
                }
            }

            @Override
            public void close() throws Exception {
                for (MongoClient mongoClient : connectionPool) {
                    try {
                        mongoClient.close();
                    } catch (Exception ignore) {
                    }
                }
            }

            private void insertOne(MongoCollection<Document> collection, Document doc) {
                String collectionName = collection.getNamespace().getCollectionName();
                //处理特殊字段
                handle(collectionName, doc);
                collection.insertOne(doc);
            }
			
			//如果有时间字段需要处理示例如下
            private void handle(String collectionName, Document doc) {
                if (collectionName.equals("test1")) {
                    //systemTime 是 Date类型,不是String  2023-10-13 11:37:43.238
                    formatStringTime(doc, "systemTime");
                    return;
                }
                if (collectionName.equals("test2")) {
                    formatStringTime(doc, "time");
                    return;
                }
                if (collectionName.equals("test3") || collectionName.equals("timer_record")) {
                    formatStringTime(doc, "createTime");
                    formatStringTime(doc, "updateTime");
                    return;
                }
            }

            //将String 转 date
            private void formatStringTime(Document doc, String key) {
                try {
                    String time = doc.getString(key);
                    if (time == null) {
                        return;
                    }
                    Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").parse(time);
                    doc.put(key, parse);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        env.addSource(KafkaUtils.getKafkaConsumerForMongo(topicList))
                .keyBy(e -> {
                    Document doc = Document.parse(e.value());
                    return doc.getString("documentKey");
                })
                .addSink(sinkFunction);

        env.execute("kafka2mongo synchronization " + topicList);
    }
}

到了这里,关于【Flink-Kafka-To-Mongo】使用 Flink 实现 Kafka 数据写入 Mongo(根据对应操作类型进行增、删、改操作,写入时对时间类型字段进行单独处理)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包