Redis(十四)双写一致性工程案例

这篇具有很好参考价值的文章主要介绍了Redis(十四)双写一致性工程案例。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

问题概述

Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存

canal

https://github.com/alibaba/canal

功能

  1. 数据库镜像
  2. 数据库实时备份
  3. 索引构建和实时维护(拆分异构索引、倒排索引等)
  4. 业务 cache 刷新
  5. 带业务逻辑的增量数据处理
    Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
    传统mysql主从复制原理
    MySQL的主从复制将经过如下步骤:
  6. 当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
  7. salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,
    如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
  8. 同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
  9. slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
    Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
    Canal原理’
    Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存

Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存

安装部署

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

mysql配置

# 查看mysql版本
SELECT VERSION();
# 查看是否开启bin_log
SHOW VARIABLES LIKE 'log_bin';

mysql中my.ini配置
linux为my.cnf

# Linux 寻找my.cnf命令
find / -name my.cnf 
# my.ini
log-bin=mysql-bin #开启 binlog
binlog-format=ROW #选择 ROW 模式
server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复
  • ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
  • STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
  • MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;
    Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
    重启mysql

授权canal连接mysql账号

# 查询现有账户
SELECT * FROM mysql.`user`

新建canal用户

# 如果存在名为 'canal' 的用户,且允许从任何主机 '%' 登录,则删除该用户。
DROP USER IF EXISTS 'canal'@'%';
# 创建一个名为 'canal' 的用户,允许从任何主机 '%' 登录,密码为 'canal'。
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';  
# 赋予 'canal' 用户在所有数据库中的所有表的全部权限,并使用密码 'canal'。
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新 MySQL 的权限表,使修改后的权限立即生效。
FLUSH PRIVILEGES;
 
SELECT * FROM mysql.user;

canal服务端

下载
https://github.com/alibaba/canal/releases/tag/canal-1.1.6
解压.tar.gz配置
修改conf/example/instance.properties
配置mysql主机ip
Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
配置mysql账号密码
Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
启动
启动脚本bin/startup.sh
前提:安装好java8环境
./start.sh
查看日志
server日志:logs/canal.log
样例日志:logs/example.log

canal客户端(Java程序)

# 选个数据库,以你自己为主,本例bigdata,按照下面建表
CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

pom引入canal

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.canal</groupId>
    <artifactId>canal_demo02</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.18</lombok.version>
        <mysql.version>5.1.47</mysql.version>
        <druid.version>1.1.16</druid.version>
        <mapper.version>4.1.5</mapper.version>
        <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
    </properties>

    <dependencies>
        <!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--SpringBoot通用依赖模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--SpringBoot与Redis整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!--SpringBoot与AOP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <!--Mysql数据库驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--SpringBoot集成druid连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <!--mybatis和springboot整合-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.spring.boot.version}</version>
        </dependency>
        <!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
        <!--hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <optional>true</optional>
        </dependency>
        <!--persistence-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
            <version>1.0.2</version>
        </dependency>
        <!--通用Mapper-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper</artifactId>
            <version>${mapper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.8.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

YML

# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/上面的数据库?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false

主启动
业务类

public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "192.168.1.1";
    public static final String  REDIS_pwd = "";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(5);
        jedisPoolConfig.setMaxIdle(5);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,30000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool fail");
    }

}

https://github.com/alibaba/canal/wiki/ClientExample

CANAL_IP_ADDR:CANAL服务端ip地址
Canal默认端口11111

public class RedisCanalClientExample
{
    public static final Integer _60SECONDS = 60;
    public static final String  CANAL_IP_ADDR = "192.168.111.185";

    private static void redisInsert(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private static void redisDelete(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(CANAL_IP_ADDR,
                11111), "example", "", "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("bigdata.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}

connector.subscribe配置过滤
Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存
try-with-resources
Redis(十四)双写一致性工程案例,Java,redis,数据库,缓存文章来源地址https://www.toymoban.com/news/detail-832833.html

到了这里,关于Redis(十四)双写一致性工程案例的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Redis---缓存双写一致性

    目录 一、什么是缓存双写一致性呢?  1.1 双检加锁机制  二、数据库和缓存一致性的更新策略 2.1、先更新数据库,后更新缓存  2.2 、先更新缓存,后更新数据库  2.3、先删除缓存,在更新数据库 延时双删的策略:  2.4.先更新数据库,在删除缓存(常用) 2.5、实际中是不可

    2024年02月15日
    浏览(33)
  • Redis缓存双写一致性

    如果redis中有数据:需要和数据库中的值相同 如果redis中无数据:数据库中的值要是最新值,且准备回写redis 缓存按照操作来分,可细分为两种: 只读缓存和读写缓存 只读缓存很简单:就是Redis只做查询,有就是有,没有就是没有,不会再进一步访问MySQL,不再需要会写机制

    2023年04月17日
    浏览(30)
  • Redis缓存(双写一致性问题)

    前言 : 什么是缓存? 缓存就像自行车,越野车的避震器 举个例子:越野车,山地自行车,都拥有\\\"避震器\\\", 防止 车体加速后因惯性,在酷似\\\"U\\\"字母的地形上飞跃,硬着陆导致的 损害 ,像个弹簧一样; 同样,实际开发中,系统也需要\\\"避震器\\\",防止过高的数据访问猛冲系统,导致其操作线程无法

    2024年02月02日
    浏览(44)
  • 【Redis】聊一下缓存双写一致性

    缓存虽然可以提高查询数据的的性能,但是在缓存和数据 进行更新的时候 其实会出现数据不一致现象,而这个不一致其实可能会给业务来带一定影响。无论是Redis 分布式缓存还是其他的缓存机制都面临这样的问题。 数据一致性 缓存中有数据,那么缓存的数据和数据库的数据

    2024年02月06日
    浏览(40)
  • Redis高级系列-缓存双写一致性

    Redis缓存双写一致性是指在更新数据库数据后,同时更新缓存数据以保持数据一致性的策略,总的来说,就是 写入redis 和 写入数据库 的数据要保持一致 2.1 Cache Aside Pattern(旁路缓存模式) 旁路缓存模式,字面意思理解:缓存是旁路,缓存相对与应用程序和数据库是旁路,应用

    2024年01月20日
    浏览(43)
  • Redis缓存双写一致性之更新策略

    你只要用缓存,就可能会涉及到redis缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题,那么你如何解决一致性问题? 双写一致性,你先动缓存redis还是数据库mysql哪一个?why? 延时双删你做过吗?会有哪些问题? 有这么一种情况,微服务查询redis无m

    2024年02月05日
    浏览(44)
  • Redis与MySQL双写一致性如何保证

    前言 在分布式系统中,数据一致性是一个重要的问题。当我们使用Redis和MySQL这两种不同的数据库时,如何保证它们之间的双写一致性是一个需要解决的难题。本文将探讨Redis与MySQL双写一致性的保证方法。 什么是双写一致性? 指的是当我们更新了数据库的数据之后redis中的数

    2024年02月09日
    浏览(36)
  • Redis缓存问题:穿透,击穿,雪崩,双写一致性等

    在高并发场景下,数据库往往是最薄弱的环节,我们通常选择使用 redis 来进行缓存,以起到缓冲作用,来降低数据库的压力,但是一旦缓存出现问题,也会导致数据库瞬间压力过大甚至崩溃,从而导致整个系统崩溃.今天就聊聊常见的 redis 缓存问题. 缓存击穿 缓存击穿一般指redis中的一

    2024年04月27日
    浏览(40)
  • 如何保证redis与db的双写一致性

        如何保证redis与db的双写一致性?这是一个十分热门的面试话题。 如何理解“一致性”这个概念?“事务”中“一致性”的定义是: 事务执行前后,数据从一个合法性状态变换到另外一个合法性状态。  比喻说,更新前:redis中记录的是100,db中记录的也是100。更新后: 

    2024年02月07日
    浏览(34)
  • [优雅的面试]MySQL与Redis双写一致性方案

    前言 由于缓存的高并发和高性能已经在各种项目中被广泛使用,在读取缓存这方面基本都是一致的,大概都是按照下图的流程进行操作: 但是在更新缓存方面,是更新完数据库再更新缓存还是直接删除缓存呢?又或者是先删除缓存再更新数据库?在这一点上就值得探讨了。

    2024年01月21日
    浏览(33)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包