环境:Flink 1.15.0,cdc2.3.0
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
目的:为了测试cdc2.3支持从"specific-offset"启动程序。
代码如下:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
String offsetFile = "binlog.000002";
Long offsetPos = 160299739L; //154 219 504
Properties prop = new Properties();
prop.setProperty("snapshot.locking.mode", "none");
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("地址")
.port(端口)
.databaseList("数据库名") // monitor all tables under inventory database
.tableList("数据库名.表名") // set captured table
.username("用户名")
.password("密码")
//设置读取位置 initial全量, latest增量, specificOffset(binlog指定位置开始读,该功能cdc2.2版本不支持)
.startupOptions(StartupOptions.specificOffset(offsetFile, Long.valueOf(offsetPos)))
// .startupOptions(StartupOptions.initial())
// .startupOptions(StartupOptions.latest())
.debeziumProperties(prop)
.deserializer(new StringDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.print("====>")
.setParallelism(1);
env.execute();
}
}
报错如下:
文章来源:https://www.toymoban.com/news/detail-732105.html
引入下面依赖,解决报错:文章来源地址https://www.toymoban.com/news/detail-732105.html
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.15.0</version>
</dependency>
到了这里,关于java.lang.ClassNotFoundException: org.apache.flink.connector.base.source.reader.RecordEmitter解决的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!