RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理

这篇具有很好参考价值的文章主要介绍了RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

🎈 1 参考文档

RabbitMQ实现数据库与ElasticSearch的数据同步 | Hannya。-CSDN

企业级开发项目实战——基于RabbitMQ实现数据库、elasticsearch的数据同步 | 波总说先赚它一个小目标-CSDN

SPringBoot集成RabbitMQ实现30秒过期删除功能 | 军大君-CSDN


🔍 2 个人需求

  1. 当进行文件上传、文件创建、文件重命名等操作时:

    通过RabbitMQ:

    • 生产者:文件服务,执行上传、创建、重命名等文件操作,将用户文件信息(例如文件名、文件ID等)发送到RabbitMQ新增队列。
    • 消费者:查询服务,监听RabbitMQ新增队列,一旦收到消息,将用户文件信息新增或更新到Elasticsearch中。
  2. 文件删除时:

    通过RabbitMQ:

    • 生产者:文件服务,执行文件删除操作,将用户文件ID发送到RabbitMQ删除队列。
    • 消费者:查询服务,监听 RabbitMQ 队列,一旦收到消息,通过用户文件ID从Elasticsearch中删除相应的用户文件信息。
  3. 根据文件名进行文件模糊查询:

    通过OpenFeign:

    • 生产者:文件服务,查询服务调用文件服务提供的OpenFeign接口,通过用户文件ID从查询该用户文件是否存在。
    • 消费者:查询服务,如果不存在,将数据根据用户文件ID从Elasticsearch中删除。
  4. 分享文件时间到期处理:

    通过RabbitMQ的TTL(生存时间) + 死信队列:

    • 生产者:文件服务, 使用TTL模拟一个“延时队列”,在文件分享时间到期后,将消息传递到死信队列。
    • 消费者:文件服务,死信监听器监听到之后,将分享文件的分享状态改为已过期状态。

🔈3 声明

只是提供思路,代码不是很完整,直接复制运行不了。

最后面有完整网盘项目代码。


🚀4 OpenFeign相关部分(查询服务)

4.1 引入依赖

<!-- nacos -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- openfeign -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- loadbalancer -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

4.2 application.yml

spring:
  # nacos注册的服务名
  application:
    name: netdisk-search
  cloud:
    nacos:
      discovery:
        # 配置注册服务的IP地址
        server-addr: (IP地址):8848
        username: nacos
        password: nacos

4.3 FileFeignService 接口

@FeignClient(name = "netdisk-file", configuration = FeignInterceptor.class)
public interface FileFeignService {
    @RequestMapping("/file/getUserFile/{userFileId}")
    ResultResponse<Boolean> getUserFile(@PathVariable Long userFileId);
}

4.4 @EnableFeignClients 注解

@ComponentScan(value = "com.cauli.search.*")
@EnableFeignClients(basePackages = "com.cauli.search")
@SpringBootApplication
public class NetdiskSearchApplication {
    public static void main(String[] args) {
        SpringApplication.run(NetdiskSearchApplication.class, args);
    }
}

🚀5 Elasticsearch相关部分(查询服务)

5.1 引入依赖

<!-- elasticsearch -->
<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.0.1</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.3</version>
</dependency>
<dependency>
    <groupId>jakarta.json</groupId>
    <artifactId>jakarta.json-api</artifactId>
    <version>2.0.1</version>
</dependency>

5.2 application.yml

# elasticsearch相关的配置
elasticsearch:
  # ES网关地址
  hostname: (IP地址)
  # ES网关端口
  port: 9200
  # ES网官方方案
  scheme: http

5.3 ElasticSearchConfig 配置类

@Configuration
public class ElasticSearchConfig {
    @Value("${elasticsearch.hostname}")
    String hostname;

    @Value("${elasticsearch.port}")
    int port;

    @Value("${elasticsearch.scheme}")
    String scheme;

    @Bean
    public ElasticsearchClient elasticsearchClient(){
        // 创建低级客户端
        RestClient client = RestClient.builder(new HttpHost(hostname, port,scheme)).build();
		// 创建API客户端,使用Jackson映射器创建传输层
        ElasticsearchTransport transport = new RestClientTransport(client,new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }
}	

5.4 Elasticsearch 服务类和服务实现类

public interface ElasticsearchService {
    /**
     * 更新ES数据
     *
     * @param fileSearchDTO
     */
    void uploadES(FileSearchDTO fileSearchDTO);

    /**
     * 删除ES数据
     *
     * @param userFileId
     */
    void deleteES(Long userFileId);


    /**
     * 搜索ES数据
     *
     * @return
     */
    List<SearchFileVO> searchES(SearchFileQueryDTO searchFileVO);
}

@Slf4j
@Service
public class ElasticsearchServiceImpl implements ElasticsearchService {
    @Autowired
    private ElasticsearchClient elasticsearchClient;

    @Resource
    private FileFeignService feignService;

    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            12, // 核心线程数
            20, // 最大线程数
            1, // 线程存活时间
            TimeUnit.SECONDS, // 存活时间单位
            new ArrayBlockingQueue<>(1000) // 任务队列
    );

    public void uploadES(FileSearchDTO fileSearchDTO) {
        executor.execute(() -> {
            try {
                elasticsearchClient.index(i -> i.index("file_search")
                        .id(fileSearchDTO.getUserFileId())
                        .document(fileSearchDTO));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void deleteES(Long userFileId) {
        executor.execute(() -> {
            try {
                elasticsearchClient.delete(d -> d
                        .index("file_search")
                        .id(String.valueOf(userFileId)));
            } catch (Exception e) {
                log.debug("ES删除操作失败,请检查配置");
            }
        });
    }

    @Override
    public List<SearchFileVO> searchES(SearchFileQueryDTO searchFileQueryDTO) {
        int pageNum = (int) searchFileQueryDTO.getPageNum() - 1;
        int pageSize = (int) (searchFileQueryDTO.getPageSize() == 0 ? 10 : searchFileQueryDTO.getPageSize());

        SearchResponse<FileSearchDTO> search = null;
        try {
            search = elasticsearchClient.search(s -> s
                    .index("file_search")
                    .query(_1 -> _1
                            .bool(_2 -> _2
                                    .must(_3 -> _3
                                            .bool(_4 -> _4
                                                    .should(_5 -> _5
                                                            .match(_6 -> _6
                                                                    .field("fileName")
                                                                    .query(searchFileQueryDTO.getFileName())))
                                                    .should(_5 -> _5
                                                            .wildcard(_6 -> _6
                                                                    .field("fileName")
                                                                    .wildcard("*" + searchFileQueryDTO.getFileName() + "*")))
                                            ))
                                    .must(_3 -> _3
                                            .term(_4 -> _4
                                                    .field("userId")
                                                    .value(StpUtil.getLoginIdAsLong())))
                            ))
                    .from(pageNum)
                    .size(pageSize)
                    .highlight(h -> h
                            .fields("fileName", f -> f.type("plain")
                                    .preTags("<span class='keyword'>").postTags("</span>"))
                            .encoder(HighlighterEncoder.Html)), FileSearchDTO.class);
        } catch (IOException e) {
            e.printStackTrace();
        }

        List<SearchFileVO> searchFileVOList = new ArrayList<>();
        if (search != null) {
            for (Hit<FileSearchDTO> hit : search.hits().hits()) {
                SearchFileVO searchFileVO = new SearchFileVO();
                BeanUtil.copyProperties(hit.source(), searchFileVO);
                searchFileVO.setHighLight(hit.highlight());
                searchFileVOList.add(searchFileVO);

                // 如果文件不存在,也从ES中删除
                if (!feignService.getUserFile(searchFileVO.getUserFileId()).getData()) {
                    executor.execute(() -> this.deleteES(searchFileVO.getUserFileId()));

                }
            }
        }
        return searchFileVOList;
    }
}

5.5 ElasticsearchController 前端控制器

@RestController
@RequestMapping("/search")
public class ElasticsearchController {
    @Autowired
    private ElasticsearchService elasticService;

    @GetMapping(value = "/searchFile")
    public RestResult<SearchFileVO> searchFile(SearchFileQueryDTO searchFileQueryDTO) {
        List<SearchFileVO> searchFileVOList = elasticService.searchES(searchFileQueryDTO);
        return RestResult.success().dataList(searchFileVOList, searchFileVOList.size());
    }
}

5.6 相关实体类

/**
 * 文件搜索VO
 */
@Data
public class SearchFileVO {
    @JsonSerialize(using = ToStringSerializer.class)
    private Long userFileId;

    private String fileName;

    private String filePath;

    private String extendName;

    private Long fileSize;

    private String fileUrl;

    private Map<String, List<String>> highLight;

    private Integer isDir;
}
/**
 * 文件搜索DTO
 */
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class FileSearchDTO {
    private String indexName;

    private String userFileId;

    private String fileId;

    private String fileName;

    private String content;

    private String fileUrl;

    private Long fileSize;

    private Integer storageType;

    private String identifier;

    private Long userId;

    private String filePath;

    private String extendName;

    private Integer isDir;

    private String deleteTime;

    private String deleteBatchNum;
}
/**
 * 文件查询条件DTO
 */
@Data
public class SearchFileQueryDTO {
    @ApiModelProperty("文件名")
    private String fileName;

    @ApiModelProperty("当前页")
    private long pageNum;

    @ApiModelProperty("每页数量")
    private long pageSize;
}

🚀6 RabbitMQ相关部分

6.1 生产者部分(文件服务)

6.1.1 引入依赖

<!-- nacos -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- RabbitMQ(我的SpringBoot是2.6.8的) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.1.2 完整application.yml

server:
  port: 8083
spring:
  # MySQL配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
    username: root
    password: (MySQL密码)
  # nacos注册的服务名
  application:
    name: netdisk-file
  cloud:
    nacos:
      discovery:
        # 配置注册服务的IP地址
        server-addr: (IP地址):8848
        username: nacos
        password: nacos
  # rabbitmq相关的配置
  rabbitmq:
    host: (IP地址)
    port: 5672
    virtual-host: (虚拟主机名,比如:/file)
    username: (用户名,默认:guest)
    password: (密码,默认:guest)

6.1.3 RabbitMQConfig 配置类

@Configuration
public class RabbitMQConfig {
    // 普通交换机
    public static final String FILE_EXCHANGE = "file.exchange";

    // 文件保存相关
    public static final String QUEUE_FILE_SAVE = "queue.file.save";
    public static final String KEY_FILE_SAVE = "key.file.save";

    // 文件删除相关
    public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
    public static final String KEY_FILE_REMOVE = "key.file.remove";

    // 死信相关
    public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
    public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
    public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";

    //延迟队列
    public static final String DELAY_QUEUE = "delay.queue";

    /**
     * 文件保存队列
     *
     * @return
     */
    @Bean
    public Queue queueFileSave() {
        return new Queue(QUEUE_FILE_SAVE);
    }

    /**
     * 文件删除队列
     *
     * @return
     */
    @Bean
    public Queue queueFileRemove() {
        return new Queue(QUEUE_FILE_REMOVE);
    }

    /**
     * 交换机
     *
     * @return
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(FILE_EXCHANGE);
    }

    /**
     * 绑定文件保存队列到交换机
     *
     * @return
     */
    @Bean
    public Binding bindFileSave() {
        return BindingBuilder.bind(queueFileSave()).to(topicExchange()).with(KEY_FILE_SAVE);
    }

    /**
     * 绑定文件删除队列到交换机
     *
     * @return
     */
    @Bean
    public Binding bindFileRemove() {
        return BindingBuilder.bind(queueFileRemove()).to(topicExchange()).with(KEY_FILE_REMOVE);
    }

    /**
     * 定义延时队列
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        //设置死信交换机和路由key
        return QueueBuilder.durable(DELAY_QUEUE)
                //如果消息过时,则会被投递到当前对应的死信交换机
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
                //如果消息过时,死信交换机会根据routing-key投递消息到对应的队列
                .withArgument("x-dead-letter-routing-key", KEY_FILE_DEAD_LETTER)
                .build();
    }

    /**
     * 定义死信交换机
     *
     * @return
     */
    @Bean
    public TopicExchange deadLetterExchange() {
        return new TopicExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 定义死信队列
     *
     * @return
     */
    @Bean
    public Queue deadLetterQueue() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    /**
     * 绑定死信队列到死信交换机
     *
     * @return
     */
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(KEY_FILE_DEAD_LETTER);
    }
}

6.1.4 FileDealComp 文件逻辑处理组件伪代码

/**
 * 文件逻辑处理组件
 */
@Slf4j
@Component
public class FileDealComp {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            12, // 核心线程数
            20, // 最大线程数
            1, // 线程存活时间
            TimeUnit.SECONDS, // 存活时间单位
            new ArrayBlockingQueue<>(1000) // 任务队列
    );

    /**
     * 更新ES数据
     *
     * @param userFileId
     */
    public void uploadES(Long userFileId) {
        executor.execute(() -> {
            FileSearchDTO fileSearchDTO = new FileSearchDTO();
            // 通过用户文件ID查询用户文件信息
            ...
            // 通过文件ID查询文件信息
            ...
            // 将用户文件信息和文件信息同步到fileSearchDTO对象
            ...  
           	// 消息队列更新ES
            rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_SAVE, fileSearchDTO);
        });
    }

    /**
     * 删除ES数据
     *
     * @param userFileId
     */
    public void deleteES(Long userFileId) {
        // 消息队列删除ES
        rabbitTemplate.convertAndSend(RabbitMQConfig.FILE_EXCHANGE, RabbitMQConfig.KEY_FILE_REMOVE, userFileId);
    }
   
    /**
     * 分享文件过期处理
     *
     * @param shareBatchNum 分享批次号
     */
    public void expiredShareFile(String shareBatchNum) {
        Share share = new Share();
        // 根据分享批次号获取分享信息
        ...
        // 将分享信息同步到share对象
        ...
        // 定义日期格式
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long differenceInMillis = 0;
        try {
            // 解析日期字符串为日期对象
            Date shareDate = sdf.parse(share.getShareTime());
            Date endDate = sdf.parse(share.getEndTime());
            // 计算时间差(毫秒数)
            differenceInMillis = endDate.getTime() - shareDate.getTime();
        } catch (ParseException e) {
            e.printStackTrace();
        }

        // 存活时间
        String expiration = Long.toString(differenceInMillis);
        // 延时队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_QUEUE, share, message -> {
            message.getMessageProperties().setExpiration(expiration);
            return message;
        });
    }
}

6.1.5 ExpiredShareFileListener 过期的分享文件处理监听器

@Slf4j
@Component
@RabbitListener(queues = "my-dlx-queue")
public class ExpiredShareFileListener {
    @Autowired
    private ShareService shareService;

    // 死信相关
    public static final String DEAD_LETTER_EXCHANGE = "deadLetter.exchange";
    public static final String DEAD_LETTER_QUEUE = "deadLetter.queue";
    public static final String KEY_FILE_DEAD_LETTER = "key.file.dead.letter";

    @RabbitListener(bindings = {
            @QueueBinding(
                    key = KEY_FILE_DEAD_LETTER,
                    value = @Queue(value = DEAD_LETTER_QUEUE, durable = "true"),
                    exchange = @Exchange(value = DEAD_LETTER_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true")
            )})
    public void receiveShareMessage(Share share) {
        log.info("监听到文件过期处理操作:{}", share);

        // 将share的分享状态改为已过期 → 将share的shareStatus由0改为1
        ...

        log.info("操作完成:{}", share);
    }
}

6.2 消费者部分(查询服务)

6.2.1 引入依赖

<!-- RabbitMQ (我的SpringBoot是2.6.8的) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

6.2.2 完整application.yml

server:
  port: 8084
spring:
  # MySQL配置
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/(数据库名)?characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true
    username: root
    password: (MySQL密码)
  # nacos注册的服务名
  application:
    name: netdisk-search
  cloud:
    nacos:
      discovery:
        # 配置注册服务的IP地址
        server-addr: (IP地址):8848
        username: nacos
        password: nacos
  mvc:
    path match:
      matching-strategy: ant_path_matcher
  servlet:
    multipart:
      enabled: true
      # 单个文件最大限制
      max-file-size: 1024MB
      # 多个文件最大限制
      max-request-size: 2048MB
  # rabbitmq相关的配置
  rabbitmq:
    host: (IP地址)
    port: 5672
    virtual-host: (虚拟主机名,比如:/file)
    username: (用户名,默认:guest)
    password: (密码,默认:guest)

# elasticsearch相关的配置
elasticsearch:
  # ES网关地址
  hostname: (IP地址)
  # ES网关端口
  port: 9200
  # ES网官方方案
  scheme: http

6.2.3 FileMQListener 文件处理消息队列监听

@Slf4j
@Component
public class FileMQListener {
    // 普通交换机
    public static final String FILE_EXCHANGE = "file.exchange";

    // 文件保存相关
    public static final String QUEUE_FILE_SAVE = "queue.file.save";
    public static final String KEY_FILE_SAVE = "key.file.save";

    // 文件删除相关
    public static final String QUEUE_FILE_REMOVE = "queue.file.remove";
    public static final String KEY_FILE_REMOVE = "key.file.remove";

    @Autowired
    private ElasticsearchService elasticsearchService;

    /**
     * 监听文件信息添加操作
     *
     * @param fileSearchDTO
     */
    @RabbitListener(bindings = {@QueueBinding(
            key = KEY_FILE_SAVE,
            value = @Queue(value = QUEUE_FILE_SAVE, durable = "true"),
            exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
    public void receiveFileSaveMessage(FileSearchDTO fileSearchDTO) {
        try {
            log.info("监听到文件信息添加操作:{}", fileSearchDTO);
            
            // 更新ES数据
            elasticsearchService.uploadES(fileSearchDTO);
            
            log.info("添加完成:{}", fileSearchDTO);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    /**
     * 监听文件信息删除操作
     *
     * @param userFileId
     */
    @RabbitListener(bindings = {@QueueBinding(
            key = KEY_FILE_REMOVE,
            value = @Queue(value = QUEUE_FILE_REMOVE, durable = "true"),
            exchange = @Exchange(value = FILE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true"))})
    public void receiveFileDeleteMessage(Long userFileId) {
        try {
            log.info("监听到文件信息删除操作:{}", userFileId);
            
            // 删除ES数据
            elasticsearchService.deleteES(userFileId);
            
            log.info("文件信息删除完成:{}", userFileId);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

📫7 代码仓库

netdisk-cloud | Gitee文章来源地址https://www.toymoban.com/news/detail-707159.html

到了这里,关于RabbitMQ实现数据库与ElasticSearch的数据同步和分享文件过期处理的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MySQL数据库实现主从同步

    安装MySQL数据库8.0.32 今天来学习数据库主从同步的原理及过程,数据库主要是用来存储WEB数据,在企业当中是极为重要的,下面一起来看下。 MySQL主从复制在中小企业,大型企业中广泛使用,MySQL主从复制的目的是实现数据库冗余备份,将master数据库数据定时同步到slave数据库

    2024年02月02日
    浏览(54)
  • 60、Flink CDC 入门介绍及Streaming ELT示例(同步Mysql数据库数据到Elasticsearch)-CDC Connector介绍及示例 (1)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年02月19日
    浏览(46)
  • C#实现SqlServer数据库同步

    实现效果: 设计思路: 1. 开启数据库及表的cdc,定时查询cdc表数据,封装sql语句(通过执行类型,主键;修改类型的cdc数据只取最后更新的记录),添加到离线数据表; 2. 线程定时查询离线数据表,更新远程库数据; 3. 远程库数据被更改又会产生cdc数据,对此数据进行拦截;

    2024年02月13日
    浏览(36)
  • Python实现两个主机的数据库同步

    实现两个主机间mysql数据同步 如果报 pymysql.err.OperationalError: (2003, “Can’t connect to MySQL server on ‘192.168.0.141’ (timed out)”) 那么很大可能是目标mysql不允许远程连接 执行如下命令即可

    2024年02月12日
    浏览(41)
  • 基于Canal实现MySQL 8.0 数据库数据同步

    主机名称 操作系统 说明 192.168.11.82 Ubuntu 22.04 主库所在服务器 192.168.11.28 Oracle Linux Server 8.7 从库所在服务器 1、Ubuntu系统下MySQL配置文件位置 2、CentOS系统下MySQL配置文件位置 3、添加如下配置,开启MySQL binlog功能 关于canal简介,这里就不再阐述,具体可以参看官方文档介绍,地

    2023年04月23日
    浏览(161)
  • [大数据 Flink,Java实现不同数据库实时数据同步过程]

    目录 🌮前言: 🌮实现Mysql同步Es的过程包括以下步骤: 🌮配置Mysql数据库连接 🌮在Flink的配置文件中,添加Mysql数据库的连接信息。可以在flink-conf.yaml文件中添加如下配置: 🌮在Flink程序中,使用JDBCInputFormat来连接Mysql数据库,并定义查询语句,获取需要同步的数据。具体代

    2024年02月10日
    浏览(40)
  • 实现↝Mysql数据库主从复制搭建与同步

    一般数据库都是读取压力大于写数据压力,主从复制即为了实现数据库的负载均衡和读写分离。通过将Mysql的某一台主机的数据复制到其它主机(slaves)上,主服务器只负责写,而从服务器只负责读。 如生产环境中,使用redis数据库作为缓存数据库,用户访问业务数据时,先

    2024年02月10日
    浏览(51)
  • Mysql数据库--实现主从复制搭建与同步

    一般数据库都是读取压力大于写数据压力,主从复制即为了实现数据库的负载均衡和读写分离。通过将Mysql的某一台主机的数据复制到其它主机(slaves)上,主服务器只负责写,而从服务器只负责读。 如生产环境中,使用redis数据库作为缓存数据库,用户访问业务数据时,先

    2024年02月08日
    浏览(57)
  • ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

    目录 一、数据同步 1.1、什么是数据同步 1.2、解决数据同步面临的问题 1.3、解决办法 1.3.1、同步调用 1.3.2、异步通知(推荐) 1.3.3、监听 binlog 1.3、基于 RabbitMQ 实现数据同步 1.3.1、需求 1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听 1.3.3、在“酒店

    2024年02月08日
    浏览(49)
  • 根据源码,模拟实现 RabbitMQ - 通过 SQLite + MyBatis 设计数据库(2)

    目录 一、数据库设计 1.1、数据库选择 1.2、环境配置 1.3、建库建表接口实现 1.4、封装数据库操作 1.5、针对 DataBaseManager 进行单元测试 1.6、心得 MySQL 是我们最熟悉的数据库,但是这里我们选择使用 SQLite,原因如下: SQLite 比 MySQL 更轻量:一个完整的 SQLite 数据库,只有一个单

    2024年02月13日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包