监听BinLog

这篇具有很好参考价值的文章主要介绍了监听BinLog。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

监听BinLog

引入pom依赖

<dependency>
	<groupId>com.zendesk</groupId>
	<artifactId>mysql-binlog-connector-java</artifactId>
	<version>0.25.1</version>
</dependency>

代码实现

import com.alibaba.nacos.api.config.annotation.NacosValue;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@Slf4j
@Component
public class BinLogListener implements ApplicationRunner {

    @NacosValue("${spring.datasource.dynamic.datasource.master.url}")
    private String url;

    @NacosValue("${spring.datasource.dynamic.datasource.master.username}")
    private String username;

    @NacosValue("${spring.datasource.dynamic.datasource.master.password}")
    private String password;

    @NacosValue("${database.binlog.active}")
    private boolean isActive;

    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Autowired
    private AssembleOneEvent assembleOneEvent;

    @Autowired
    private AssembleTwoEvent assembleTwoEvent;

    private final HashMap<String, String> tableInfo = Maps.newHashMap();

    @Override
    public void run(ApplicationArguments args) {
        if (!isActive) return;
        assembleOneEvent.doUpdate(executor);
        assembleTwoEvent.doUpdate(executor);
        CompletableFuture.runAsync(this::connectMysqlBinLog, executor);
    }

    /**
     * 连接mysqlBinLog
     */
    public void connectMysqlBinLog() {
        log.info("监控BinLog服务已启动");
        Map<String, String> jdbcInfo = FPAStringUtils.getJdbcInfo(url);
        BinaryLogClient client = new BinaryLogClient(jdbcInfo.get("host"), Integer.parseInt(jdbcInfo.get("port")), username, password);
        client.setServerId(1);
        client.setKeepAlive(true);
        client.registerEventListener(event -> event(jdbcInfo, event));
        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private void event(Map<String, String> jdbcInfo, Event event) {
        EventData data = event.getData();
        TableMapEventData tableMapEventData;
        EventType eventType = event.getHeader().getEventType();
        if (eventType == EventType.TABLE_MAP) {
            tableMapEventData = (TableMapEventData) data;
            String database = tableMapEventData.getDatabase();
            String table = tableMapEventData.getTable();
            tableInfo.put("database", database);
            tableInfo.put("table", table);
        }
        assembleOneEvent.assembleOne(jdbcInfo, tableInfo, event, executor);
        assembleTwoEvent.assembleTwo(jdbcInfo, tableInfo, event, executor);
    }

}
}

处理业务1

@Slf4j
@Component
public class AssembleOneEvent{
    private final List<String> TABLE_NAME = Lists.newArrayList("TABLE1", "TABLE2");
    
    @Autowired
    private AssembleOneService assembleOneService;

    public void assembleOne(Map<String, String> jdbcInfo, HashMap<String, String> map, Event event, Executor executor) {
        if (StringUtils.equals(jdbcInfo.get("database"), map.get("database")) && TABLE_NAME.contains(map.get("table"))) {
            if (event.getHeader().getEventType() == EventType.XID) {
                log.info("AssembleOne updating !");
                try {
                    this.doUpdate(executor);
                } finally {
                    map.clear();
                }
            }
        }
    }

    public void doUpdate(Executor executor) {
        //业务处理逻辑
        //CompletableFuture.runAsync(() -> assembleOneService.updateAssembleOne(), executor);
    }

}

处理业务2文章来源地址https://www.toymoban.com/news/detail-806024.html

@Slf4j
@Component
public class AssembleTwoEvent {
    private final List<String> TABLE_NAME = Lists.newArrayList("TABLE3", "TABLE4");

    @Autowired
    private AssembleTwoService assembleTwoService;

    public void AssembleTwo(Map<String, String> jdbcInfo, HashMap<String, String> map, Event event, Executor executor) {
        if (StringUtils.equals(jdbcInfo.get("database"), map.get("database")) && TABLE_NAME.contains(map.get("table"))) {
            if (event.getHeader().getEventType() == EventType.XID) {
                log.info("AssembleTwo updating !");
                try {
                    this.doUpdate(executor);
                } finally {
                    map.clear();
                }
            }
        }
    }

    public void doUpdate(Executor executor) {
        //业务处理逻辑
        //CompletableFuture.runAsync(() -> assembleTwoService.updateAssembleTwo(), executor);
    }

}

到了这里,关于监听BinLog的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • mysql8关闭binlog并清空Binlog

    编辑my.ini或者my.cnf文件 清空binlog信息 停用binlog功能 为啥要关闭binlog功能呢? 是因为反正是个测试服务器,磁盘比较小。无所谓数据丢不丢的。但是不建议生产环境关闭,这可能会造成非常严重的灾难。 在mysqld分组下面加skip-log-bin配置 然后重启数据库即可 重启后查询log_bin状

    2024年02月09日
    浏览(33)
  • MySQL 开启配置binlog以及通过binlog恢复数据

       binlog是MySQL sever层维护的一种二进制日志,binlog是记录所有数据库表结构变更(例如CREATE、ALTER TABLE、DROP等)以及表数据修改(INSERT、UPDATE、DELETE、TRUNCATE等)的二进制日志。不会记录SELECT和SHOW这类操作,因为这类操作对数据本身并没有修改。 作用主要有: 主从复制:在

    2024年02月03日
    浏览(38)
  • mysql关闭binlog日志,删除binlog数据(win和linux通用)

    打开 mysql 命令窗口,查询 binlog 是否开启   (ON)为开启状态 (OFF)为关闭状态 若开启状态则需要修改配置文件,反之不需要任何操作 在 C:ProgramDataMySQLMySQL Server 8.0 路径下打开 my.ini 并注释掉 bin-log 配置项然后在其后面加入skip-log-bin   重启mysql服务   打开 mysql 命令窗口,

    2024年02月07日
    浏览(30)
  • MySQL 如何查询binlog

    binlog开启成功之后,binlog文件的位置可以在my.inf配置文件中查看。也可以在MySQL的命令行中查看。命令行查看代码如下: 然后可以看到MySQL的binlog相关信息:  然后进入相关目录下  因为此文件为字节码文件,直接查看是不可读的,因此需要借助MySQL 的mysqlbinlog命令: 然后就

    2024年02月13日
    浏览(28)
  • 清理MySQL中的binlog

    Mysql的binlog开启后一直没清理,占用太大空间 expire_logs_days=0: 这里的值如果为0,表示所有binlog日志永久都不会失效,不会自动删除; 这里的值如果为30,表示只保留最近30天。 永久生效(重启后即生效) 修改配置文件my.cnf文件: vim /etc/my.cnf 如果binlog非常多,推荐使用purge命令

    2023年04月08日
    浏览(26)
  • mysql binlog 回滚

    mysqlbinlog 严格来说mysqlbinlog 不能算回滚,他只是将过去的数据修改记录 重新执行一遍,但是从结果上来看,他也算把数据恢复到任意时间点了,举例来说在昨天的某一刻误删除了一条数据,导致其他数据存储都是异常,今天才发现,现在我希望回滚到那一刻,那么我只要在

    2024年02月12日
    浏览(34)
  • mysql的binlog日志

    一、查看和配置binlog 1、log_bin 是否开启binlog,指定日志文件路径 2、log_bin_basename 和 log_bin_index 日志文件基础名和索引名(*好像不能用来设置只是展示作用,我设置时时会报错无法启动服务) 3、binlog_format 日志格式 4、binlog_error_action 设置当binlog日志数据一致性遭到破坏或者复

    2024年02月16日
    浏览(27)
  • 【mysql】binlog日志

    1.1 基本说明 1.全称binary log,二进制日志 2.记录了所有的DDL语句(Data Definition Language数据定义语言)和DML语句(Data Manipulation /məˌnɪpjuˈleɪʃn/ Language数据操作语言) 3.不包括数据查询语句(select、show) 4.作用:灾难时的数据恢复;mysql的主从复制 5.mysql8.0版本,默认二进制日

    2024年02月13日
    浏览(21)
  • mysql binlog

    二进制日志文件记录了数据库修改的事件,像表的修改,表数据的变更等。也包含潜在的可能修改数据的语句事件。如一些delete或update最后修改的数据行可能是0,也会被记录在binlog中(和日志格式也有一定关系,非row-based)。除此之外binlog还会记录语句的执行时间信息。 binlo

    2024年02月22日
    浏览(24)
  • MySQL:binlog启动与查看

    Mysql binlog,即二进制日志,是MySQL最重要的日志,它记录了所有的DDL和DML语句(除了数据查询语句select、show等),以数据形式记录,还包含语句执行所消耗的时间。 binlog的主要目的是复制和恢复。 如何查看MySQL是否开启了binlog? 登录MySQL后,输入: 显示off则未开启,显示o

    2024年02月12日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包