Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch

这篇具有很好参考价值的文章主要介绍了Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客

基础上使用canal将mysql数据实时同步到Elasticsearch。

1. 数据库准备

CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;

USE `shop`;

/*Table structure for table `sku` */

DROP TABLE IF EXISTS `sku`;

CREATE TABLE `sku` (
  `id` VARCHAR(60) NOT NULL COMMENT '商品id',
  `name` VARCHAR(200) NOT NULL COMMENT 'SKU名称',
  `price` INT NOT NULL DEFAULT '1' COMMENT '价格(分)',
  `num` INT DEFAULT '100' COMMENT '库存数量',
  `image` VARCHAR(200) DEFAULT NULL COMMENT '商品图片',
  `images` VARCHAR(2000) DEFAULT NULL COMMENT '商品图片列表',
  `create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `spu_id` VARCHAR(60) DEFAULT NULL COMMENT 'SPUID',
  `category_id` INT DEFAULT NULL COMMENT '类目ID',
  `category_name` VARCHAR(200) DEFAULT NULL COMMENT '类目名称',
  `brand_id` INT DEFAULT NULL COMMENT '品牌id',
  `brand_name` VARCHAR(100) DEFAULT NULL COMMENT '品牌名称',
  `sku_attribute` VARCHAR(200) DEFAULT NULL COMMENT '规格',
  `status` INT DEFAULT '1' COMMENT '商品状态 1-正常,2-下架,3-删除',
  PRIMARY KEY (`id`),
  KEY `cid` (`category_id`),
  KEY `status` (`status`),
  KEY `updated` (`update_time`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb3 COMMENT='商品表';

/*Data for the table `sku` */

INSERT  INTO `sku`(`id`,`name`,`price`,`num`,`image`,`images`,`create_time`,`update_time`,`spu_id`,`category_id`,`category_name`,`brand_id`,`brand_name`,`sku_attribute`,`status`) VALUES 
('1318594982227025922','华为Mate40 Pro 32G',114,1228,'https://sklll.oss-cn-beijing.aliyuncs.com/secby/af1faf56-b10a-4700-9896-3143a2d1c40f.jpg','https://sklll.oss-cn-beijing.aliyuncs.com/secby/a65bfbe4-21b7-42b2-b5cf-47a9730e0a16.jpg,https://sklll.oss-cn-beijing.aliyuncs.com/secby/fa52ef66-7724-4d6e-bece-15eba0f8f903.jpg,https://sklll.oss-cn-beijing.aliyuncs.com/secby/734f0f17-ac73-45d3-a6bf-83e1569ce887.jpg','2020-10-20 16:48:37','2023-12-29 19:02:16','1318594982147334146',11159,'软件研发',11,'华为','{\"就业薪资\":\"10K起\",\"学习费用\":\"2万\"}',1),
('1318596430360813570','华为Mate40 Pro 32G 1800万像素',112,1227,'https://sklll.oss-cn-beijing.aliyuncs.com/secby/9247d041-e940-426c-8e50-06084b631063.jpg','https://sklll.oss-cn-beijing.aliyuncs.com/secby/5f5b7435-6cf2-4797-8f65-d4abff181390.jpg','2020-10-20 16:54:22','2023-12-29 19:07:47','1318596430293704706',11159,'软件研发',11,'华为','{\"就业薪资\":\"10K起\",\"学习费用\":\"2万\"}',1),
('1318596430398562305','华为Mate40 Pro 128G',111,1226,'https://sklll.oss-cn-beijing.aliyuncs.com/secby/900a3618-9884-4778-bad9-c6c31eaf3eab.jpg','https://sklll.oss-cn-beijing.aliyuncs.com/secby/5f5b7435-6cf2-4797-8f65-d4abff181390.jpg','2020-10-20 16:54:22','2023-12-29 19:11:28','1318596430293704706',11159,'软件研发',11,'华为','{\"就业薪资\":\"10K起\",\"学习费用\":\"2万\"}',1),
('1318599511605563394','格力手机 5G手机',100,1225,'https://sklll.oss-cn-beijing.aliyuncs.com/secby/2b233c6a-5acc-449e-ba3a-70a506100948.jpg','https://sklll.oss-cn-beijing.aliyuncs.com/secby/ffc66a17-edfc-43bb-8f66-431b1e9bf606.jpg','2020-10-20 17:06:37','2023-12-29 19:11:25','1318599511492317185',11159,'软件研发',11,'华为','{\"就业薪资\":\"10K起\",\"学习费用\":\"2万\"}',1),
('1318599511647506433','格力手机 5G手机 红色',789,1224,'https://sklll.oss-cn-beijing.aliyuncs.com/secby/1c1fbfea-af9f-49e7-b89b-35e751874399.jpg','https://sklll.oss-cn-beijing.aliyuncs.com/secby/ffc66a17-edfc-43bb-8f66-431b1e9bf606.jpg','2020-10-20 17:06:37','2020-10-20 17:06:37','1318599511492317185',11159,'软件研发',11,'华为','{\"就业薪资\":\"10K起\",\"学习费用\":\"2万\"}',1),


 2. 公共部分

公共包

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--MyBatis Plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.2</version>
        </dependency>

        <!--MySQL-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--Nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
            <version>1.0</version>
            <scope>compile</scope>
        </dependency>

实体类Sku

@Column注解

用来标识实体类中属性与数据表中字段的对应关系

name
定义了被标注字段在数据库表中所对应字段的名称;由于驼峰命名法,如果不使用@Column字段,canal在监控数据变化时,获得的实体类部分字段为null,比如create_time等等。

@TableName(value ="sku")
@Data
@Table
public class Sku implements Serializable {
    @TableId(type = IdType.ASSIGN_ID)
    private String id;

    private String name;

    private Integer price;

    private Integer num;

    private String image;

    private String images;
    @Column(name = "create_time")
    private Date createTime;
    @Column(name = "update_time")
    private Date updateTime;
    @Column(name = "spu_id")
    private String spuId;
    @Column(name = "category_id")
    private Integer categoryId;
    @Column(name = "category_name")
    private String categoryName;
    @Column(name = "brand_id")
    private Integer brandId;
    @Column(name = "brand_name")
    private String brandName;
    @Column(name = "sku_attribute")
    private String skuAttribute;

    private Integer status;

    private static final long serialVersionUID = 1L;
}

实体类SkuEs

@Data
@Document(indexName = "skusearch")//indexName一定全小写,不然出错
public class SkuEs {

    @Id
    private String id;
    @Field(type = FieldType.Text,analyzer = "ik_smart",searchAnalyzer = "ik_smart")
    private String name;
    private Integer price;
    private Integer num;
    private String image;
    private String images;
    private Date createTime;
    private Date updateTime;
    private String spuId;
    private Integer categoryId;
    //Keyword:不分词
    @Field(type= FieldType.Keyword)
    private String categoryName;
    private Integer brandId;
    @Field(type=FieldType.Keyword)
    private String brandName;
    @Field(type=FieldType.Keyword)
    private String skuAttribute;
    private Integer status;
}

3. mall-search-service

导入依赖:

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
            <version>2.6.3</version>
        </dependency>

这里使用的springboot版本为2.7.12,springboot和spring-boot-starter-data-elasticsearch的版本问题会导致异常:

java.lang.NoSuchFieldError: INDEX_CONTENT_TYPE

bootstrap.yaml代码如下:

server:
  port: 8084
spring:
  application:
    name: mall-search
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
  #Elasticsearch服务配置 6.8.12
  elasticsearch:
      uris: http://localhost:9200

编写SkuSearchMapper,在主启动类上添加@EnableElasticsearchRepositories(basePackages = {"xx"}),basePackages指定mapper包路径

public interface SkuSearchMapper extends ElasticsearchRepository<SkuEs,String> {
}

Service层

public interface SkuSearchService {

    void add(SkuEs skuEs);

    void del(String id);
}


@Service
public class SkuSearchServiceImpl implements SkuSearchService {

    @Autowired
    SkuSearchMapper skuSearchMapper;

    @Autowired
    ElasticsearchRestTemplate elasticsearchRestTemplate;

    @Override
    public void add(SkuEs skuEs) {
        skuSearchMapper.save(skuEs);
    }

    @Override
    public void del(String id) {
       skuSearchMapper.deleteById(id);
    }
}

controller层

@RestController
@RequestMapping(value = "/search")
public class SkuSearchController {

    @Autowired
    private SkuSearchService skuSearchService;


    /*****
     * 增加索引
     */
    @PostMapping(value = "/add")
    public RespResult add(@RequestBody SkuEs skuEs){
        skuSearchService.add(skuEs);
        return RespResult.ok();
    }

    /***
     * 删除索引
     */
    @DeleteMapping(value = "/del/{id}")
    public RespResult del(@PathVariable(value = "id")String id){
        skuSearchService.del(id);
        return RespResult.ok();
    }
}

4. mall-canal-service

基于上一篇文章Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客的canal服务。

添加以下代码:

feign接口

@FeignClient(value = "mall-search")

public interface SkuSearchFeign {
    @PostMapping(value = "/search/add")
    RespResult add(@RequestBody SkuEs skuEs);

    /***
     * 删除索引
     */
    @DeleteMapping(value = "/search/del/{id}")
    RespResult del(@PathVariable(value = "id")String id);
}

 canal设计代码如下:

@Component
@CanalTable(value = "sku")
public class SkuSearchHandler implements EntryHandler<Sku> {

    @Autowired
    SkuSearchFeign skuSearchFeign;

    @Override
    public void insert(Sku sku) {
        System.out.println(sku);
        String jsonString = JSON.toJSONString(sku);
        SkuEs skuEs = JSON.parseObject(jsonString, SkuEs.class);
         skuSearchFeign.add(skuEs);
    }

    @Override
    public void update(Sku before, Sku after) {
        System.out.println(after);
        String jsonString = JSON.toJSONString(after);
        SkuEs skuEs = JSON.parseObject(jsonString, SkuEs.class);
        System.out.println(skuEs);
        skuSearchFeign.add(skuEs);
    }

    @Override
    public void delete(Sku sku) {
        System.out.println(sku);
        skuSearchFeign.del(sku.getId());
    }
}

 文章来源地址https://www.toymoban.com/news/detail-769444.html

 

到了这里,关于Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • canal实时同步mysql数据到elasticsearch(部署,配置,测试)(一)

    canal基于MySQL数据库增量日志解析,提供增量数据订阅和消费,是阿里开源CDC工具,它可以获取MySQL binlog数据并解析,然后将数据变动传输给下游。基于canal,可以实现从MySQL到其他数据库的实时同步 MySQL主备复制原理 MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫

    2023年04月08日
    浏览(34)
  • 基于Canal实现Mysql数据实时同步到Elasticsearch(Docker版)

    1、Canal简介   Canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。   Canal会模拟MySQL主库和从库的交互协议,从而伪装成MySQL的从库,然后向My

    2024年02月10日
    浏览(32)
  • MySQL如何实时同步数据到ES?试试阿里开源的Canal

    前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——  Canal  。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。 目录 前言 简介  工作原理  MySQL主备复制原理 canal 工作原理 Canal架构  C

    2024年02月20日
    浏览(33)
  • 使用 Docker 部署 canal 服务实现MySQL和ES实时同步

    参考 ClientAdapter: Canal的Adapter配置项目 Sync ES:Canal的Adapter中ES同步的配置项 使用 Docker 部署 canal 服务 docker canal-server canal-adapter mysql Canal(基于Docker同步mysql数据到elasticsearch) Canal部署过程中的错误 Canal 1.1.4 Canal Adapter 1.1.4 Kibana: 6.8.8 ElasticSearch: 6.4.3 由于Canal 1.1.4只能适配 Ela

    2024年02月13日
    浏览(36)
  • 大数据Canal(三):使用Canal同步MySQL数据

    文章目录 ​​​​​​使用Canal同步MySQL数据 一、Canal架构原理

    2024年02月03日
    浏览(36)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(40)
  • 实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精简操作而来 让ELK在同一个docker网络下通过名字直接访问 Ubuntu服务器ELK部署与实践 使用 Docker 部署 canal 服务实现MySQL和ES实时同步 Docker部署ES服务,canal全量同步的时候内存爆炸,ES/Canal Adapter自动关闭,CPU100% 2.1 新建mysql docker 首先新建数据库的docker镜像

    2024年02月11日
    浏览(32)
  • Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

    关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。 注:本文使用的Canal为 v1.1.7 先查看目标数据

    2024年04月10日
    浏览(32)
  • 使用canal+rocketmq实现将mysql数据同步到es

    实际开发过程中,经常遇到数据库与缓存不一致的问题,造成这种问题的原因有很多,其中缓存数据没有及时更新、缓存中过期的数据没有及时更新,导致缓存中存在失效数据,导致数据库与缓存不一致。而这种问题的出现大部分都是因为同步延迟、缓存失效、过期和错误使

    2024年02月11日
    浏览(31)
  • 基于Canal与Flink实现数据实时增量同步(一)

    vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: kms-1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql:// s p r i n g . d a t a s o u r c e . a d d r e s s / {spring.datasource.address}/ s p r in g . d

    2024年04月13日
    浏览(37)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包