监听BinLog
引入pom依赖
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.25.1</version>
</dependency>
代码实现
import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
public class BinLogListener implements ApplicationRunner {
@NacosValue("${spring.datasource.dynamic.datasource.master.url}")
private String url;
@NacosValue("${spring.datasource.dynamic.datasource.master.username}")
private String username;
@NacosValue("${spring.datasource.dynamic.datasource.master.password}")
private String password;
@NacosValue("${database.binlog.active}")
private boolean isActive;
@Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor executor;
@Autowired
private AssembleOneEvent assembleOneEvent;
@Autowired
private AssembleTwoEvent assembleTwoEvent;
private final HashMap<String, String> tableInfo = Maps.newHashMap();
@Override
public void run(ApplicationArguments args) {
if (!isActive) return;
assembleOneEvent.doUpdate(executor);
assembleTwoEvent.doUpdate(executor);
CompletableFuture.runAsync(this::connectMysqlBinLog, executor);
}
/**
* 连接mysqlBinLog
*/
public void connectMysqlBinLog() {
log.info("监控BinLog服务已启动");
Map<String, String> jdbcInfo = FPAStringUtils.getJdbcInfo(url);
BinaryLogClient client = new BinaryLogClient(jdbcInfo.get("host"), Integer.parseInt(jdbcInfo.get("port")), username, password);
client.setServerId(1);
client.setKeepAlive(true);
client.registerEventListener(event -> event(jdbcInfo, event));
try {
client.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
private void event(Map<String, String> jdbcInfo, Event event) {
EventData data = event.getData();
TableMapEventData tableMapEventData;
EventType eventType = event.getHeader().getEventType();
if (eventType == EventType.TABLE_MAP) {
tableMapEventData = (TableMapEventData) data;
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
tableInfo.put("database", database);
tableInfo.put("table", table);
}
assembleOneEvent.assembleOne(jdbcInfo, tableInfo, event, executor);
assembleTwoEvent.assembleTwo(jdbcInfo, tableInfo, event, executor);
}
}
}
处理业务1文章来源:https://www.toymoban.com/news/detail-806024.html
@Slf4j
@Component
public class AssembleOneEvent{
private final List<String> TABLE_NAME = Lists.newArrayList("TABLE1", "TABLE2");
@Autowired
private AssembleOneService assembleOneService;
public void assembleOne(Map<String, String> jdbcInfo, HashMap<String, String> map, Event event, Executor executor) {
if (StringUtils.equals(jdbcInfo.get("database"), map.get("database")) && TABLE_NAME.contains(map.get("table"))) {
if (event.getHeader().getEventType() == EventType.XID) {
log.info("AssembleOne updating !");
try {
this.doUpdate(executor);
} finally {
map.clear();
}
}
}
}
public void doUpdate(Executor executor) {
//业务处理逻辑
//CompletableFuture.runAsync(() -> assembleOneService.updateAssembleOne(), executor);
}
}
处理业务2文章来源地址https://www.toymoban.com/news/detail-806024.html
@Slf4j
@Component
public class AssembleTwoEvent {
private final List<String> TABLE_NAME = Lists.newArrayList("TABLE3", "TABLE4");
@Autowired
private AssembleTwoService assembleTwoService;
public void AssembleTwo(Map<String, String> jdbcInfo, HashMap<String, String> map, Event event, Executor executor) {
if (StringUtils.equals(jdbcInfo.get("database"), map.get("database")) && TABLE_NAME.contains(map.get("table"))) {
if (event.getHeader().getEventType() == EventType.XID) {
log.info("AssembleTwo updating !");
try {
this.doUpdate(executor);
} finally {
map.clear();
}
}
}
}
public void doUpdate(Executor executor) {
//业务处理逻辑
//CompletableFuture.runAsync(() -> assembleTwoService.updateAssembleTwo(), executor);
}
}
到了这里,关于监听BinLog的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!