整体思路:
1、使用 io.debezium.connector.mysql.MySqlConnector 自动同步数据到kafka消息队列
2、通过listener监听消息队列,代码控制数据插入es
ps:其实有更简单的方式:在此基础上使用ElasticsearchSinkConnector、ksql,完成数据的转换与自动同步es,全程无需代码控制,后续本地跑通流程后再来记录
一、连接器的下载与配置
下载debezium mysql connector
在kafka中建立connect文件夹,并解压连接器
在kafka/config下的connect-distributed.properties文件中,修改plugin.path=连接器地址
启动连接器:
bin/connect-distributed.sh -daemon config/connect-distributed.properties
postman查询连接器是否配置成功
http://localhost:8083/connector-plugins
如果返回连接器,则表示配置成功
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.1.2.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.3.2"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.3.2"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.3.2"
}
]
二、创建同步连接器实例
post请求地址:
http://localhost:8083/connectors
请求体:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1", // 数据库ip
"database.port": "3306",
"database.user": "root", // 数据库登陆用户名
"database.password": "123456", // 登陆密码
"database.server.id": "2",
"database.server.name": "hc",
"database.include.list": "store", // 需要同步的库
"table.include.list": "store.product", // 需要同步的表
"database.history.kafka.bootstrap.servers": "localhost:9092", // kafka地址
"database.history.kafka.topic": "schema-changes-inventory",
"topic.prefix": "pro",
"include.schema.changes": "true",
"transforms": "unwrap,Cast",
"transforms.Cast.type":
"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "amount:float64,unit_price:float64",
"transforms.unwrap.type":
"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
查看是否建立成功:
get请求:
http://localhost:8083/connectors
返回结果:
[
"mysql-connector"
]
三、代码里监听消息
@Component
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class HcCustomerListener {
private final EsSearchService esSearchService;
private final IEsRepository esRepository;
private final String INDEX = "product";
/**
* 监听产品表
* @param record
*/
@KafkaListener(topics = "test.store.product")
public void onMessage(ConsumerRecord<String, String> record) {
String kafkaMessage = record.value();
if (StrUtil.isBlank(kafkaMessage)) {
return;
}
// 检查索引是否存在,没有则新建
if (!esRepository.checkIndex(INDEX)) {
if (!esRepository.createIndex(INDEX)) {
log.error("建立索引失败!索引名:" + INDEX);
}
}
// 数据转换为要存储的对象
Product item = JSONObject.toJavaObject(JSONObject.parseObject(kafkaMessage), Product.class);
// 数据同步
if (!esRepository.dataSync(Product, INDEX, QueryBuilders.termQuery("code", Product.getCode()))) {
log.error("产品信息同步es失败!产品编号:" + customer.getCode());
}
}
}
ps:关于数据存储es,数据查询es的具体方法,会写一篇专门的文章记录
中间也遇到了一些坑,比如建connector的时候一直报错缺少什么值,最后把jdk1.8改到jdk17就好了文章来源:https://www.toymoban.com/news/detail-721507.html
比如同步数据一直报数据转换错误,才发现bigdecimal类型的字段需要在建connector时显示的去做转换:"transforms.Cast.spec": "unit_price:float64,amount:float64"这样就行文章来源地址https://www.toymoban.com/news/detail-721507.html
到了这里,关于通过kafka connector实现mysql数据自动同步es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!