java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解决

这篇具有很好参考价值的文章主要介绍了java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解决。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

环境:Flink 1.15.0,cdc2.3.0

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
</dependency>

目的:为了测试cdc2.3支持从"specific-offset"启动程序。

代码如下:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class MySqlSourceExample {

    public static void main(String[] args) throws Exception {

        String offsetFile = "binlog.000002";

        Long offsetPos = 160299739L;  //154 219 504
        Properties prop = new Properties();
        prop.setProperty("snapshot.locking.mode", "none");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("地址")
                .port(端口)
                .databaseList("数据库名") // monitor all tables under inventory database
                .tableList("数据库名.表名") // set captured table
                .username("用户名")
                .password("密码")

                //设置读取位置 initial全量, latest增量,  specificOffset(binlog指定位置开始读,该功能cdc2.2版本不支持)
                .startupOptions(StartupOptions.specificOffset(offsetFile, Long.valueOf(offsetPos)))
//                .startupOptions(StartupOptions.initial())
//                .startupOptions(StartupOptions.latest())
                .debeziumProperties(prop)
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .print("====>")
                .setParallelism(1);

        env.execute();
    }
}

报错如下:

java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解决,Flink,java,apache,flink

引入下面依赖,解决报错:文章来源地址https://www.toymoban.com/news/detail-732105.html

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.15.0</version>
</dependency>

到了这里,关于java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解决的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包