spring boot +springboot集成es7.9.1+canal同步到es

这篇具有很好参考价值的文章主要介绍了spring boot +springboot集成es7.9.1+canal同步到es。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


未经许可,请勿转载。

前言

  1. 其实大部分的代码是来源于参考资料来源主要代码实现,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看主要代码实现,结合我的来使用。
  2. 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没有类型的概念了。所以我自己自定义数据类型,有需要的可以自己拓展自己需要的类型。
  3. 我这里主要写的是代码实现,没有涉及到中间件的搭建,因为真的没有时间,哈哈。

参考资料来源

主要实现代码:https://gitee.com/gz-yami/mall4cloud?_from=gitee_search
自定义注解:https://cloud.tencent.com/developer/article/1911164
自定义分词器:https://blog.csdn.net/m0_57302315/article/details/121103241
Canal胶水层:https://gitee.com/throwableDoge/canal-glue

rocketmq

  1. mq maven依赖
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.2.0</version>
            </dependency>
  1. mq 适配器
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class RocketMqAdapter {

    private final RocketMQMessageConverter rocketMqMessageConverter;

    @Value("${rocketmq.name-server:}")
    private String nameServer;

    public RocketMQTemplate getTemplateByTopicName(String topic){
        RocketMQTemplate mqTemplate = new RocketMQTemplate();
        DefaultMQProducer producer = new DefaultMQProducer(topic);
        producer.setNamesrvAddr(nameServer);
        producer.setRetryTimesWhenSendFailed(2);
        producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT);
        mqTemplate.setProducer(producer);
        mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());
        return mqTemplate;
    }

}
  1. mq的一些常量信息RocketMqConstant
/**
 * nameserver用;分割
 * 同步消息,如果两次
 */
public class RocketMqConstant {

    // 延迟消息 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)

    /**
     * 自动收货时间,实际上7天
     */
    public static final int ORDER_AUTO_RECEIPT_DELAY_LEVEL = 60 * 24 * 7;

    /**
     * 默认发送消息超时时间
     */
    public static final long TIMEOUT = 3000;


    /**
     * 订单取消退款
     */
    public static final String ORDER_REFUND_TOPIC = "order-refund-topic";

    /**
     * 订单自动收货
     */
    public static final String AUTO_RECEIPT_TOPIC = "auto-receipt-topic";

    /**
     * 服务订单订单支付成功
     */
    public static final String ORDER_NOTIFY_TOPIC = "order-notify-topic";

    /**
     * canal-topic
     */
    public static final String CANAL_TOPIC = "canal-topic";

}

  1. mq的配置类
import com.onecode.dtg.basic.RocketMqAdapter;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class RocketMqConfig {

    private final RocketMqAdapter rocketMqAdapter;

    @Lazy
    @Bean(destroyMethod = "destroy")
    public RocketMQTemplate autoReceiptTemplate() {
        return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.AUTO_RECEIPT_TOPIC);
    }
}

  1. mq 的配置文件信息
rocketmq:
  name-server: 127.0.0.1:9876

elasticsearch

elasticsearch的搭建我在这里就不不多bb了,你们自行百度,下面是资料。

  1. maven所需依赖
 	</properties>
        <elasticsearch.version>7.9.1</elasticsearch.version>
    </properties>
      <dependencies>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>${elasticsearch.version}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>${elasticsearch.version}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>${elasticsearch.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>commons-logging</artifactId>
                        <groupId>commons-logging</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
          </dependencies>
  1. 分词所需资料 elasticsearch搭建的资料,点击这里。
  2. elasticsearch的yml
# elastic的地址
elastic:
  hostname: 127.0.0.1
  port: 9200
  1. elasticsearch 启动配置类
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author gideon
 */
@Configuration
@RequiredArgsConstructor
public class ElasticConfig {


    @Value("${elastic.hostname}")
    private String hostname;

    @Value("${elastic.port}")
    private int port;

    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost(hostname, port)));
    }
}
  1. elasticsearch 自定义注解,AnalyzerType在下面。
import java.lang.annotation.*;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsField {

    FieldType type() default FieldType.TEXT;

    /**
     * 指定分词器
     *
     * @return AnalyzerType
     */
    AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
  1. elasticsearch 自定义AnalyzerType,因为我自己的业务需要所以我加了一个自定义分词器comma
import lombok.Getter;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Getter
public enum AnalyzerType {

    /**
     * 不使用分词
     */
    NO("不使用分词"),
    /**
     * 标准分词,默认分词器
     */
    STANDARD("standard"),

    /**
     * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
     */
    IK_SMART("ik_smart"),

    /**
     * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
     */
    IK_MAX_WORD("ik_max_word"),

    /**
     * ik_max_word :会将文本做逗号分词
     */
    COMMA("comma"),
    ;

    private final String type;

    AnalyzerType(String type) {
        this.type = type;
    }

}
  1. elasticsearch 自定义FieldType
import lombok.Getter;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Getter
public enum FieldType {
    /**
     *
     */
    TEXT("text"),

    KEYWORD("keyword"),

    INTEGER("integer"),

    DOUBLE("double"),

    DATE("date"),

    LONG("long"),

    /**
     * 单条数据
     */
    OBJECT("object"),

    /**
     * 嵌套数组
     */
    NESTED("nested"),
    ;

    FieldType(String type){
        this.type = type;
    }

    private final String type;
}
  1. elasticsearch索引名称枚举
/**
 * es当中的index
 *
 * @author gideon
 */
public enum EsIndexEnum {

    /**
     * 护理员
     */
    SERVER("server"),

    ;

    private final String value;

    public String value() {
        return value;
    }

    EsIndexEnum(String value) {
        this.value = value;
    }
}
  1. elasticsearch 创建索引代码EsIndexCreateService,CommonBizException是我自定义的异常,你们可以使用自己自定义的异常类。
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.lang.reflect.Field;

/**
 * @author gideon
 * @date 2022/9/8
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class EsIndexCreateService {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 不需要逗号分词器索引
     *
     * @param indexName 索引名称
     * @param clazz     同步到es的实体类
     * @return boolean
     */
    public boolean createIndex(String indexName, Class<?> clazz) {
        return createIndex(indexName, clazz, false);
    }

    /**
     * 建立索引
     *
     * @param indexName 索引名称
     * @param comma     是否需要逗号分词器
     * @return boolean
     */
    public boolean createIndex(String indexName, Class<?> clazz, Boolean comma) {
        try {
//            判断索引是否存在
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
            if (exists) {
                return true;
            }
            CreateIndexRequest request = new CreateIndexRequest(indexName);

            if (comma) {
                XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
                        .startObject()
                        .startObject("analysis")
                        .startObject("analyzer")
                        .startObject("comma")
                        .field("type", "pattern")
//                        将分词器规则定义为按照","进行分词
                        .field("pattern", ",")
                        .endObject()
                        .endObject()
                        .endObject()
                        .endObject();
                request.settings(settingsBuilder);
            }

//            这里创建索引结构
            request.mapping(generateBuilder(clazz));
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
//            指示是否所有节点都已确认请求
            boolean acknowledged = response.isAcknowledged();
//            指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
            boolean shardsAcknowledged = response.isShardsAcknowledged();
            if (acknowledged || shardsAcknowledged) {
                log.info("创建索引成功!索引名称为{}", indexName);
                return true;
            }
            return false;
        } catch (IOException e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + indexName + "失败。");
        }
    }


    /**
     * 生成es索引
     *
     * @param clazz 对于的es实体
     * @return XContentBuilder
     */
    public static XContentBuilder generateBuilder(Class<?> clazz) {
        try {
//        获取索引名称及类型
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            builder.startObject("properties");
            Field[] declaredFields = clazz.getDeclaredFields();
            for (Field declaredField : declaredFields) {
                if (declaredField.isAnnotationPresent(EsField.class)) {
//                获取注解
                    EsField declaredAnnotation = declaredField.getDeclaredAnnotation(EsField.class);
//                        如果嵌套对象
                    /**
                     * {
                     *   "mappings": {
                     *     "properties": {
                     *       "region": {
                     *         "type": "keyword"
                     *       },
                     *       "manager": {
                     *         "properties": {
                     *           "age":  { "type": "integer" },
                     *           "name": {
                     *             "properties": {
                     *               "first": { "type": "text" },
                     *               "last":  { "type": "text" }
                     *             }
                     *           }
                     *         }
                     *       }
                     *     }
                     *   }
                     * }
                     */
                    if (declaredAnnotation.type() == FieldType.OBJECT) {
//                    获取当前类的对象-- Action
                        Class<?> type = declaredField.getType();
                        Field[] typeDeclaredFields = type.getDeclaredFields();
                        builder.startObject(declaredField.getName());
                        builder.startObject("properties");
//                    遍历该对象中的所有属性
                        for (Field field : typeDeclaredFields) {
                            if (field.isAnnotationPresent(EsField.class)) {
//                            获取注解
                                EsField fieldDeclaredAnnotation = field.getDeclaredAnnotation(EsField.class);
                                builder.startObject(field.getName());
                                builder.field("type", fieldDeclaredAnnotation.type().getType());
//                            keyword不需要分词
                                if (fieldDeclaredAnnotation.type() == FieldType.TEXT) {
                                    builder.field("analyzer", fieldDeclaredAnnotation.analyzer().getType());
                                }
                                builder.endObject();
                            }
                        }
                        builder.endObject();
                        builder.endObject();

                    } else {
                        builder.startObject(declaredField.getName());
                        builder.field("type", declaredAnnotation.type().getType());
//                        keyword不需要分词
                        if (declaredAnnotation.type() == FieldType.TEXT) {
                            builder.field("analyzer", declaredAnnotation.analyzer().getType());
                        }
                        builder.endObject();
                    }
                }
            }
//            对应property
            builder.endObject();
            builder.endObject();
            return builder;
        } catch (IOException e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引失败。");
        }
    }
}

canal

因为我这里使用的是第三方的canal jar包,也就是上面说到的Canal胶水层
获取地址:https://gitee.com/gz-yami/mall4cloud/tree/master/mall4cloud-common/mall4cloud-common-core/lib
引入的maven

		<dependency>
			<groupId>cn.throwx</groupId>
			<artifactId>canal-glue-core</artifactId>
			<version>1.0</version>
			<scope>system</scope>
			<systemPath>${pom.basedir}/lib/canal-glue-core.jar</systemPath>
		</dependency>

类似于下面这样放
spring boot +springboot集成es7.9.1+canal同步到es文章来源地址https://www.toymoban.com/news/detail-410033.html

  1. canal的canal.properties配置文件信息,这里主要看你使用什么信息队列就配置什么。我使用的是RocketMQ,然后需要创建一个topic去监听数据库的操作日志,配置topic在rocketmq.producer.group = canal-topic
#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = canal-topic
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.1.46:9876
rocketmq.retry.times.when.send.failed = 3
rocketmq.vip.channel.enabled = false
rocketmq.tag =

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

  1. canal的instance.properties配置文件信息,canal.instance.filter.regex这个参数可以指定监听的数据库->表
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.1.46:3306
canal.instance.master.journal.name=mysql-binlog.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=mp_biz_service.server:*,mp_biz_service.shop_service_server:*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

  1. canal自定义处理器NdCanalBinLogEventParser
import cn.throwx.canal.gule.common.BinLogEventType;
import cn.throwx.canal.gule.common.OperationType;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.parser.BaseCommonEntryFunction;
import cn.throwx.canal.gule.support.parser.BasePrimaryKeyTupleFunction;
import cn.throwx.canal.gule.support.parser.CanalBinLogEventParser;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;

import java.util.*;

/** 
 * @author gideon
 */
@Slf4j
public class NdCanalBinLogEventParser implements CanalBinLogEventParser {

    @Override
    public <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event, Class<T> klass, BasePrimaryKeyTupleFunction primaryKeyFunction, BaseCommonEntryFunction<T> commonEntryFunction) {
        BinLogEventType eventType = BinLogEventType.fromType(event.getType());
        if (Objects.equals(BinLogEventType.CREATE, eventType) || Objects.equals(BinLogEventType.ALTER, eventType)) {
            if (log.isDebugEnabled()) {
                log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
            }
            return Collections.emptyList();
        }

        if (BinLogEventType.UNKNOWN != eventType && BinLogEventType.QUERY != eventType) {
            if (Boolean.TRUE.equals(event.getIsDdl())) {
                CanalBinLogResult<T> entry = new CanalBinLogResult<>();
                entry.setOperationType(OperationType.DDL);
                entry.setBinLogEventType(eventType);
                entry.setDatabaseName(event.getDatabase());
                entry.setTableName(event.getTable());
                entry.setSql(event.getSql());
                return Collections.singletonList(entry);
            } else {
                Optional.ofNullable(event.getPkNames()).filter((x) -> x.size() == 1).orElseThrow(() -> new IllegalArgumentException("DML类型binlog事件主键列数量不为1"));
                String primaryKeyName = event.getPkNames().get(0);
                List<CanalBinLogResult<T>> entryList = new LinkedList<>();
                List<Map<String, String>> data = event.getData();
                List<Map<String, String>> old = event.getOld();
                int dataSize = null != data ? data.size() : 0;
                int oldSize = null != old ? old.size() : 0;
                if (dataSize > 0) {
                    for(int index = 0; index < dataSize; ++index) {
                        CanalBinLogResult<T> entry = new CanalBinLogResult<>();
                        entryList.add(entry);
                        entry.setSql(event.getSql());
                        entry.setOperationType(OperationType.DML);
                        entry.setBinLogEventType(eventType);
                        entry.setTableName(event.getTable());
                        entry.setDatabaseName(event.getDatabase());
                        Map<String, String> item = data.get(index);
                        entry.setAfterData(commonEntryFunction.apply(item));
                        Map<String, String> oldItem = null;
                        if (oldSize > 0 && index <= oldSize) {
                            oldItem = old.get(index);
                            entry.setBeforeData(commonEntryFunction.apply(oldItem));
                        }

                        entry.setPrimaryKey(primaryKeyFunction.apply(oldItem, item, primaryKeyName));
                    }
                }

                return entryList;
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
            }

            return Collections.emptyList();
        }
    }

    private NdCanalBinLogEventParser() {
    }

    public static NdCanalBinLogEventParser of() {
        return new NdCanalBinLogEventParser();
    }
}
  1. canal自定义处理器NdCanalBinlogEventProcessorFactory
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @author gideon
 */
public class NdCanalBinlogEventProcessorFactory implements CanalBinlogEventProcessorFactory {

    private final ConcurrentMap<ModelTable, List<BaseCanalBinlogEventProcessor<?>>> cache = new ConcurrentHashMap<>(16);

    @Override
    public void register(ModelTable modelTable, BaseCanalBinlogEventProcessor<?> processor) {
        synchronized(this.cache) {
            this.cache.putIfAbsent(modelTable, new LinkedList<>());
            this.cache.get(modelTable).add(processor);
        }
    }

    @Override
    public List<BaseCanalBinlogEventProcessor<?>> get(ModelTable modelTable) {
        return this.cache.get(modelTable);
    }

    private NdCanalBinlogEventProcessorFactory() {
    }

    public static NdCanalBinlogEventProcessorFactory of() {
        return new NdCanalBinlogEventProcessorFactory();
    }
}

  1. canal自定义处理器NdCanalGlue
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.adapter.SourceAdapterFacade;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;

import java.util.List;

/**
 * @author gideon
 */
public class NdCanalGlue implements CanalGlue {

    private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;

    @Override
    public void process(String content) {
        CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);
        ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());
        List<BaseCanalBinlogEventProcessor<?>> baseCanalBinlogEventProcessors = this.canalBinlogEventProcessorFactory.get(modelTable);
        if (baseCanalBinlogEventProcessors.isEmpty()) {
            return;
        }
        baseCanalBinlogEventProcessors.forEach((processor) -> processor.process(event));
    }


    private NdCanalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        this.canalBinlogEventProcessorFactory = canalBinlogEventProcessorFactory;
    }

    public static NdCanalGlue of(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return new NdCanalGlue(canalBinlogEventProcessorFactory);
    }
}
  1. canal配置类
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.support.parser.*;
import cn.throwx.canal.gule.support.parser.converter.CanalFieldConverterFactory;
import cn.throwx.canal.gule.support.parser.converter.InMemoryCanalFieldConverterFactory;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalBinLogEventParser;
import com.onecode.middle.search.service.canal.NdCanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalGlue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.util.Map;

/**
 * @author gideon
 */
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {

    private ConfigurableListableBeanFactory configurableListableBeanFactory;

    @Bean
    @ConditionalOnMissingBean
    public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
        return NdCanalBinlogEventProcessorFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
        return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalFieldConverterFactory canalFieldConverterFactory() {
        return InMemoryCanalFieldConverterFactory.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public CanalBinLogEventParser canalBinLogEventParser() {
        return NdCanalBinLogEventParser.of();
    }

    @Bean
    @ConditionalOnMissingBean
    public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
        return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
    }

    @Bean
    @Primary
    public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
        return NdCanalGlue.of(canalBinlogEventProcessorFactory);
    }

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
    }

    @SuppressWarnings({"rawtypes", "unchecked"})
    @Override
    public void afterSingletonsInstantiated() {
        ParseResultInterceptorManager parseResultInterceptorManager
                = configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
        ModelTableMetadataManager modelTableMetadataManager
                = configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
        CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
                = configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
        CanalBinLogEventParser canalBinLogEventParser
                = configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);
        Map<String, BaseParseResultInterceptor> interceptors
                = configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);
        interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));
        Map<String, BaseCanalBinlogEventProcessor> processors
                = configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);
        processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,
                canalBinlogEventProcessorFactory, parseResultInterceptorManager));
    }
}

  1. ServerBO canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表,因为这个数据是需要同步到es的所以设置了类型。
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.AnalyzerType;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * @author gideon
 */
@Data
@CanalModel(database = "mp_biz_service", table = "server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ServerBO {

    @ApiModelProperty("id")
    @EsField(type = FieldType.LONG)
    private Long id;

    @ApiModelProperty("用户标识")
    @EsField(type = FieldType.LONG)
    private Long userId;

    @ApiModelProperty("类型")
    @EsField(type = FieldType.KEYWORD)
    private String type;

    @ApiModelProperty("姓名")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String name;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    @EsField(type = FieldType.KEYWORD)
    private String gender;

    @ApiModelProperty("出生年月日")
    @JsonFormat(pattern = "yyyy-MM-dd")
    @EsField(type = FieldType.DATE)
    private LocalDate birthday;

    @ApiModelProperty("学历")
    @EsField(type = FieldType.KEYWORD)
    private String education;

    @ApiModelProperty("从业时间")
    @JsonFormat(pattern = "yyyy-MM-dd")
    @EsField(type = FieldType.DATE)
    private LocalDate practiceDate;

    @ApiModelProperty("评级")
    @EsField(type = FieldType.KEYWORD)
    private String level;

    @ApiModelProperty("认证标签")
    @EsField(type = FieldType.TEXT)
    private String authLabel;

    @ApiModelProperty("勋章(逗号隔开)")
    @EsField(type = FieldType.TEXT)
    private String medal;

    @ApiModelProperty("服务评分")
    @EsField(type = FieldType.INTEGER)
    private Integer serviceScore;

    @ApiModelProperty("已实名认证")
    @EsField(type = FieldType.INTEGER)
    private Integer realNameAuth;

    @ApiModelProperty("身份证号")
    @EsField(type = FieldType.TEXT)
    private String idCardNo;

    @ApiModelProperty("户籍-省")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String idCardRegion;

    @ApiModelProperty("手机号")
    @EsField(type = FieldType.TEXT)
    private String phone;

    @ApiModelProperty("现住-省")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentProvince;

    @ApiModelProperty("现住-市")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentCity;

    @ApiModelProperty("现住-区")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentRegion;

    @ApiModelProperty("现住-地址")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String presentAddress;

    @ApiModelProperty("头像")
    @EsField(type = FieldType.TEXT)
    private String head;

    @ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
    @EsField(type = FieldType.KEYWORD)
    private String useStatus;

    @ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
    @EsField(type = FieldType.KEYWORD)
    private String auditStatus;

    @ApiModelProperty("驳回理由")
    @EsField(type = FieldType.TEXT)
    private String rejectReason;

    @ApiModelProperty("注册时间")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime regDate;

    @ApiModelProperty("介绍-内容")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String introContent;

    @ApiModelProperty("介绍-视频")
    @EsField(type = FieldType.TEXT)
    private String introVideo;

    @ApiModelProperty("介绍-标签")
    @EsField(type = FieldType.TEXT)
    private String introLabel;

    @ApiModelProperty("商家标识")
    @EsField(type = FieldType.LONG)
    private Long merchantId;

    @ApiModelProperty("组织标识")
    @EsField(type = FieldType.LONG)
    private Long orgId;

    @ApiModelProperty("护理员申请sku,多个逗号隔开")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
    private String skuId;

    @ApiModelProperty("护理员排班数据,多个逗号隔开")
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
    private String schedule;

    /**
     * 逻辑删除
     */
    @EsField(type = FieldType.INTEGER)
    private Integer del;

    /**
     * 创建人
     */
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String createBy;

    /**
     * 创建时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    @EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String updateBy;

    /**
     * 更新时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    @EsField(type = FieldType.DATE)
    private LocalDateTime updateTime;

}

  1. ShopServiceServer canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * @author gideon
 */
@Data
@CanalModel(database = "mp_biz_service", table = "shop_service_server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ShopServiceServerBO {
    @ApiModelProperty("ID")
    private Long id;

    @ApiModelProperty("产品标识")
    private Long productId;

    @ApiModelProperty("服务者用户标识")
    private Long serverUserId;

    @ApiModelProperty("产品sku标识")
    private Long productSkuId;

    @ApiModelProperty(value = "盈利")
    private Integer profit;

    @ApiModelProperty("商家标识")
    private Long merchantId;

    @ApiModelProperty("组织标识")
    private Long orgId;

    /**
     * 逻辑删除
     */
    private Integer del;

    /**
     * 创建人
     */
    private String createBy;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    private String updateBy;

    /**
     * 更新时间
     */
    private LocalDateTime updateTime;
}

  1. 至此canal的基础代码就完成。

消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索

  1. 监听我们上面canal-topic订阅的消息然后进行同步数据CanalListener
import cn.throwx.canal.gule.CanalGlue;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * canal消费数据库操作日志mq
 *
 * @author gideon
 */
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_TOPIC)
public class CanalListener implements RocketMQListener<String> {

    private final CanalGlue canalGlue;

    @Override
    public void onMessage(String message) {
        canalGlue.process(message);
    }
}
  1. 对我们需要监听的表进行处理ServerCanalListener,这里面的hutool是一个工具类有需要的可以自行引入
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.6</version>
</dependency>
import cn.hutool.json.JSONUtil;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.ExceptionHandler;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.util.EsIndexCreateService;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerCanalListener extends BaseCanalBinlogEventProcessor<ServerBO> {

    private final EsIndexCreateService esIndexCreateService;

    private final ServerFeignClient serverFeignClient;

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 插入护理员,此时插入es
     */
    @Override
    protected void processInsertInternal(CanalBinLogResult<ServerBO> result) {
        Long serverId = result.getPrimaryKey();
        EsServerBO esServerBO = serverFeignClient.loadEsServerBO(serverId);
        if (esServerBO == null) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常");
        }

//        创建索引
        boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
        if (!index) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
        }

        IndexRequest request = new IndexRequest(EsIndexEnum.SERVER.value());
        request.id(String.valueOf(serverId));
        request.source(JSONUtil.toJsonStr(esServerBO), XContentType.JSON);
        try {
            IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            log.info(indexResponse.toString());

        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "保存es信息异常:" + e);
        }
    }

    /**
     * 更新护理员,删除护理员索引,再重新构建一个
     */
    @Override
    protected void processUpdateInternal(CanalBinLogResult<ServerBO> result) {
        Long spuId = result.getPrimaryKey();
        EsServerBO esServerBO = serverFeignClient.loadEsServerBO(spuId);
        String source = JSONUtil.toJsonStr(esServerBO);

//        创建索引
        boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
        if (!index) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
        }
        UpdateRequest request = new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(spuId));
        request.doc(source, XContentType.JSON);
        request.docAsUpsert(true);
        try {
            UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
            log.info(updateResponse.toString());
        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "删除es信息异常:" + e);
        }
    }

    @Override
    protected ExceptionHandler exceptionHandler() {
        return (CanalBinLogEvent event, Throwable throwable) -> {
            throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常:" + throwable);
        };
    }

}

  1. 这个表的监听,是因为我的业务需求,shop_service_server表增加或者删除的时候需要将skuId加到server表的skuId字段里面去,所以需要监听修改。ShopServiceServerCanalListener
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ShopServiceServerBO;
import com.onecode.middle.search.service.manager.ServerUpdateManager;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ShopServiceServerCanalListener extends BaseCanalBinlogEventProcessor<ShopServiceServerBO> {

    private final ServerFeignClient serverFeignClient;
    private final ServerUpdateManager serverUpdateManager;

    /**
     * 新增商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processInsertInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    //数据库操作后的数据
        ShopServiceServerBO afterData = result.getAfterData();
        EsServerBO loadServerBO = loadServerBO(afterData.getServerUserId());
        List<String> skuIdList = StrUtil.split(loadServerBO.getSkuId(), ",");
        skuIdList.add(afterData.getProductSkuId().toString());
        EsServerBO esServerBO = new EsServerBO();
        esServerBO.setSkuId(StrUtil.join(",", skuIdList));
        serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
    }

    /**
     * 更新商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processUpdateInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    	//数据库执行操作后的数据
        ShopServiceServerBO afterData = result.getAfterData();
        //del字段是我的表是否逻辑删除的判断,大家根据自己需要去掉
        if ("1".equals(afterData.getDel())) {
            return;
        }
        //微服务项目调用接口查询数据
        EsServerBO loadEsServerBO = loadServerBO(afterData.getServerUserId());
        //处理修改后的数据
        EsServerBO esServerBO = dealWithData(afterData, loadEsServerBO);
        serverUpdateManager.esUpdateServerByServerId(loadEsServerBO.getId(), esServerBO);
    }

    /**
     * 删除商品服务者数据
     *
     * @param result result
     */
    @Override
    protected void processDeleteInternal(CanalBinLogResult<ShopServiceServerBO> result) {
    //数据库操作前的数据
        ShopServiceServerBO beforeData = result.getBeforeData();
        EsServerBO loadServerBO = loadServerBO(beforeData.getServerUserId());
        EsServerBO esServerBO = dealWithData(beforeData, loadServerBO);
        serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
    }

    /**
     * 处理数据
     *
     * @param data 数据库操作数据
     * @return EsServerBO
     */
    private EsServerBO dealWithData(ShopServiceServerBO data, EsServerBO loadEsServerBO) {

        List<String> skuIdList = StrUtil.split(loadEsServerBO.getSkuId(), ",");
        CollUtil.removeAny(skuIdList, data.getProductSkuId().toString());
        EsServerBO esServerBO = new EsServerBO();
        esServerBO.setSkuId(StrUtil.join(",", skuIdList));
        return esServerBO;

    }

    /**
     * 获取护理员书信息
     *
     * @param serverUserId 护理员用户标识
     * @return EsServerBO
     */
    private EsServerBO loadServerBO(Long serverUserId) {
        EsServerBO loadEsServerBO = serverFeignClient.loadEsServerBoByServerUserId(serverUserId);
        if (loadEsServerBO == null) {
            throw new CommonBizException(ResultCode.FAIL.getModel(),
                    "es数据同步失败:无法通过护工用户标识:" + serverUserId + "找到护理员信息。");
        }
        return loadEsServerBO;
    }
}

  1. ServerUpdateManager,这个是ShopServiceServerListener的处理实现类
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;

/**
 * @author gideon
 */
@Component
@RequiredArgsConstructor
public class ServerUpdateManager {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 批量更新es中的商品信息
     *
     * @param serverId      护理员标识
     * @param esServerBO 更新的数据
     */
    public void esUpdateServerByServerId(Long serverId, EsServerBO esServerBO) {
        String source = JSONUtil.toJsonStr(esServerBO);
        try {
            BulkRequest request = new BulkRequest();
            // 准备更新的数据
            request.add(new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(serverId)).doc(source, XContentType.JSON));
            //更新
            BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                throw new CommonBizException(ResultCode.FAIL.getModel(), bulkResponse.buildFailureMessage());
            }
        } catch (Exception e) {
            throw new CommonBizException(ResultCode.FAIL.getModel(), e.getMessage());
        }
    }
}
  1. ServerSearchManager是搜索接口实现
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.util.ColumnUtil;
import com.onecode.dtg.basic.common.util.LocalDateUtil;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import com.onecode.service.feign.constant.AuditStatus;
import com.onecode.service.feign.constant.UseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
 * @author gideon
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerSearchManager {

    private final RestHighLevelClient restHighLevelClient;

    /**
     * 通过搜索信息分页搜索es数据的信息
     *
     * @param serverSearchDTO 护理员搜索条件
     * @return 搜索结果
     */
    public EsPageVO<EsServerVO> pageSearchResult(ServerSearchDTO serverSearchDTO) {
        //1、动态构建出查询需要的DSL语句
        EsPageVO<EsServerVO> result;

        //1、准备检索请求
        SearchRequest searchRequest = buildSearchRequest(serverSearchDTO);

        try {
            //2、执行检索请求
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            log.info("搜索返回结果:" + response.toString());

            //3、分析响应数据,封装成我们需要的格式
            result = buildSearchResult(serverSearchDTO, response);
        } catch (IOException e) {
            log.error(e.toString());
            throw new CommonBizException(ResultCode.FAIL.getModel(), "搜索服务出了点小差,请稍后再试:" + e);
        }
        return result;
    }

    /**
     * 构建结果数据
     */
    private EsPageVO<EsServerVO> buildSearchResult(ServerSearchDTO dto, SearchResponse response) {
        EsPageVO<EsServerVO> esPageVO = new EsPageVO<>();

        //1、返回的所有查询到的商品
        SearchHits hits = response.getHits();
        List<EsServerVO> productSearchs = getEsOrderBOList(response);
        esPageVO.setList(productSearchs);


        //===============分页信息====================//
        //总记录数
        long total = hits.getTotalHits().value;
        esPageVO.setTotal(total);
        // 总页码
        int totalPages = (int) total % dto.getPageSize() == 0 ?
                (int) total / dto.getPageSize() : ((int) total / dto.getPageSize() + 1);
        esPageVO.setPages(totalPages);
        return esPageVO;
    }

    private List<EsServerVO> getEsOrderBOList(SearchResponse response) {

        return getOrderListByResponse(response.getHits().getHits());
    }

    /**
     * 从es返回的数据中获取spu列表
     *
     * @param hits es返回的数据
     * @return
     */
    private List<EsServerVO> getOrderListByResponse(SearchHit[] hits) {
        List<EsServerVO> esOrders = new ArrayList<>();
        for (SearchHit hit : hits) {
            EsServerVO esOrder = JSONUtil.toBean(hit.getSourceAsString(), EsServerVO.class);
            esOrders.add(esOrder);
        }
        return esOrders;
    }


    /**
     * 准备检索请求
     *
     * @param param 搜索参数
     * @return
     */
    private SearchRequest buildSearchRequest(ServerSearchDTO param) {

        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        // 构建bool-query
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

        // 过滤
        filterQueryIfNecessary(param, boolQueryBuilder);

        // 关键字搜索
        keywordSearch(param, boolQueryBuilder);

        // 排序
        sort(searchSourceBuilder, boolQueryBuilder);

        //分页
        searchSourceBuilder.from((param.getPageNum() - 1) * param.getPageSize());
        searchSourceBuilder.size(param.getPageSize());

        log.info("构建的DSL语句 {}", searchSourceBuilder);

        return new SearchRequest(new String[]{EsIndexEnum.SERVER.value()}, searchSourceBuilder);
    }


    /**
     * 关键字搜索
     */
    private void keywordSearch(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {

        BoolQueryBuilder keywordShouldQuery = QueryBuilders.boolQuery();
//        现住-省
        if (Objects.nonNull(param.getPresentProvince())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentProvince), param.getPresentProvince()).operator(Operator.AND));
        }
//        现住-市
        if (Objects.nonNull(param.getPresentCity())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentCity), param.getPresentCity()).operator(Operator.AND));
        }
//        现住-区
        if (Objects.nonNull(param.getPresentRegion())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentRegion), param.getPresentRegion()).operator(Operator.AND));
        }
//        户籍-省
        if (Objects.nonNull(param.getIdCardProvince())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardProvince), param.getIdCardProvince()).operator(Operator.AND));
        }
//        户籍-市
        if (Objects.nonNull(param.getIdCardCity())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardCity), param.getIdCardCity()).operator(Operator.AND));
        }
//        户籍-区
        if (Objects.nonNull(param.getIdCardRegion())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardRegion), param.getIdCardRegion()).operator(Operator.AND));
        }

//        标签
        if (Objects.nonNull(param.getIntroLabels())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIntroLabel), param.getIntroLabels()).operator(Operator.AND));
        }
//        排班,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的,但是需要多个匹配,我就使用了for循环,应该是有优化的地方,暂时没处理
        if (param.getServiceStartDate() != null && param.getServiceEndDate() != null) {
            List<String> scheduleList = LocalDateUtil.getContinuousTime(param.getServiceStartDate(), param.getServiceEndDate(), DateTimeFormatter.ofPattern("yyyyMMdd"));
            for (String schedule : scheduleList) {
                keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSchedule), schedule).operator(Operator.AND));
            }
        }
//        skuId,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的
        if (Objects.nonNull(param.getSkuId())) {
            keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSkuId), param.getSkuId()).operator(Operator.AND));
        }
        boolQueryBuilder.must(keywordShouldQuery);
    }

    /**
     * 进行排序
     */
    private void sort(SearchSourceBuilder searchSourceBuilder, BoolQueryBuilder boolQueryBuilder) {
        searchSourceBuilder.sort(ColumnUtil.getName(ServerBO::getCreateTime), SortOrder.DESC);
        searchSourceBuilder.query(boolQueryBuilder);
    }

    /**
     * 过滤查询条件,如果有必要的话
     *
     * @param param            查询条件
     * @param boolQueryBuilder 组合进boolQueryBuilder
     */
    private void filterQueryIfNecessary(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
//        类型
        if (Objects.nonNull(param.getType())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getType), param.getType()));
        }
//        性别
        if (Objects.nonNull(param.getGender())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getGender), param.getGender()));
        }
//        学历
        if (Objects.nonNull(param.getEducation())) {
            boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getEducation), param.getEducation()));
        }
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getAuditStatus), AuditStatus.PASS.getStatus()));
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getUseStatus), UseStatus.NORMAL.getStatus()));
        boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getDel), 0));
    }
}
  1. ServerSearchController
import com.onecode.dtg.basic.common.model.ResultBean;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.manager.ServerSearchManager;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;

/**
 * @author gideon
 * @date 2022/9/6
 */
@Validated
@AllArgsConstructor
@RestController
@RequestMapping("/search/server/")
@Api(tags = "api-服务者搜索接口")
public class ServerSearchController {

    private final ServerSearchManager serverSearchManager;

    @PostMapping("/page")
    public ResultBean<EsPageVO<EsServerVO>> page(@RequestBody ServerSearchDTO dto) {
        return new ResultBean<>(serverSearchManager.pageSearchResult(dto));
    }

}

  1. 分页参数EsPageDTO实体类
import com.onecode.dtg.basic.common.util.PrincipalUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import javax.validation.constraints.NotNull;
import java.util.Arrays;

/**
 * @author gideon
 */
@Data
public class EsPageDTO {

    public static final String ASC = "ASC";

    public static final String DESC = "DESC";

    /**
     * 最大分页大小,如果分页大小大于500,则用500作为分页的大小。防止有人直接传入一个较大的数,导致服务器内存溢出宕机
     */
    public static final Integer MAX_PAGE_SIZE = 500;

    /**
     * 当前页
     */
    @NotNull(message = "pageNum 不能为空")
    @ApiModelProperty(value = "当前页", required = true)
    private Integer pageNum;

    @NotNull(message = "pageSize 不能为空")
    @ApiModelProperty(value = "每页大小", required = true)
    private Integer pageSize;

    @ApiModelProperty(value = "排序字段数组,用逗号分割")
    private String[] columns;

    @ApiModelProperty(value = "排序字段方式,用逗号分割,ASC正序,DESC倒序")
    private String[] orders;

    public Integer getPageNum() {
        return pageNum;
    }

    public void setPageNum(Integer pageNum) {
        this.pageNum = pageNum;
    }

    public Integer getPageSize() {
        return pageSize;
    }

    public void setPageSize(Integer pageSize) {
        if (pageSize > MAX_PAGE_SIZE) {
            this.pageSize = MAX_PAGE_SIZE;
            return;
        }
        this.pageSize = pageSize;
    }

    public String getOrderBy() {
        return order(this.columns, this.orders);
    }

    public String[] getColumns() {
        return columns;
    }

    public void setColumns(String[] columns) {
        this.columns = columns;
    }

    public String[] getOrders() {
        return orders;
    }

    public void setOrders(String[] orders) {
        this.orders = orders;
    }

    public static String order(String[] columns, String[] orders) {

        if (columns == null || columns.length == 0) {
            return "";
        }

        StringBuilder stringBuilder = new StringBuilder();

        for (int x = 0; x < columns.length; x++) {

            String column = columns[x];
            String order;

            if (orders != null && orders.length > x) {
                order = orders[x].toUpperCase();
                if (!(order.equals(ASC) || order.equals(DESC))) {
                    throw new IllegalArgumentException("非法的排序策略:" + column);
                }
            } else {
                order = ASC;
            }

            // 判断列名称的合法性,防止SQL注入。只能是【字母,数字,下划线】
            if (PrincipalUtil.isField(column)) {
                throw new IllegalArgumentException("非法的排序字段名称:" + column);
            }

            // 驼峰转换为下划线
            column = humpConversionUnderscore(column);

            if (x != 0) {
                stringBuilder.append(", ");
            }
            stringBuilder.append("`").append(column).append("` ").append(order);
        }
        return stringBuilder.toString();
    }

    public static String humpConversionUnderscore(String value) {
        StringBuilder stringBuilder = new StringBuilder();
        char[] chars = value.toCharArray();
        for (char character : chars) {
            if (Character.isUpperCase(character)) {
                stringBuilder.append("_");
                character = Character.toLowerCase(character);
            }
            stringBuilder.append(character);
        }
        return stringBuilder.toString();
    }


    @Override
    public String toString() {
        return "EsPageDTO{" +
                "pageNum=" + pageNum +
                ", pageSize=" + pageSize +
                ", columns=" + Arrays.toString(columns) +
                ", orders=" + Arrays.toString(orders) +
                '}';
    }
}

  1. 查询参数实体类ServerSearchDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;

import javax.validation.constraints.NotNull;
import java.time.LocalDate;

/**
 * @author gideon
 */
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerSearchDTO extends EsPageDTO{

    @ApiModelProperty("类型")
    private String type;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    private String gender;

    @ApiModelProperty("学历")
    private String education;

    @ApiModelProperty("户籍-省")
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    private String idCardRegion;

    @ApiModelProperty("现住-省")
    private String presentProvince;

    @ApiModelProperty("现住-市")
    private String presentCity;

    @ApiModelProperty("现住-区")
    private String presentRegion;

    @ApiModelProperty("介绍-标签(多个值需要使用逗号分割)")
    private String introLabels;

    @ApiModelProperty("服务开始时间")
    @NotNull(message = "服务开始时间不能为空")
    private LocalDate serviceStartDate;

    @ApiModelProperty("服务结束时间")
    @NotNull(message = "服务结束时间不能为空")
    private LocalDate serviceEndDate;

    @ApiModelProperty("skuId")
    @NotNull(message = "skuId不能为空。")
    private Long skuId;

}

  1. 返回值EsServerVO参数
/**
 * @author gideon
 * @date 2022/9/5
 */
@Data
public class EsServerVO {

    @ApiModelProperty("id")
    private Long id;

    @ApiModelProperty("用户标识")
    private Long userId;

    @ApiModelProperty("类型")
    private String type;

    @ApiModelProperty("姓名")
    private String name;

    @ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
    private String gender;

    @ApiModelProperty("出生年月日")
    @JsonFormat(pattern = "yyyy-MM-dd")
    private LocalDate birthday;

    @ApiModelProperty("学历")
    private String education;

    @ApiModelProperty("从业时间")
    @JsonFormat(pattern = "yyyy-MM-dd")
    private LocalDate practiceDate;

    @ApiModelProperty("评级")
    private String level;

    @ApiModelProperty("认证标签")
    private String authLabel;

    @ApiModelProperty("勋章(逗号隔开)")
    private String medal;

    @ApiModelProperty("服务评分")
    private Integer serviceScore;

    @ApiModelProperty("已实名认证")
    private Integer realNameAuth;

    @ApiModelProperty("身份证号")
    private String idCardNo;

    @ApiModelProperty("户籍-省")
    private String idCardProvince;

    @ApiModelProperty("户籍-市")
    private String idCardCity;

    @ApiModelProperty("户籍-区")
    private String idCardRegion;

    @ApiModelProperty("手机号")
    private String phone;

    @ApiModelProperty("现住-省")
    private String presentProvince;

    @ApiModelProperty("现住-市")
    private String presentCity;

    @ApiModelProperty("现住-区")
    private String presentRegion;

    @ApiModelProperty("现住-地址")
    private String presentAddress;

    @ApiModelProperty("头像")
    private String head;

    @ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
    private String useStatus;

    @ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
    private String auditStatus;

    @ApiModelProperty("驳回理由")
    private String rejectReason;

    @ApiModelProperty("注册时间")
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime regDate;

    @ApiModelProperty("介绍-内容")
    private String introContent;

    @ApiModelProperty("介绍-视频")
    private String introVideo;

    @ApiModelProperty("介绍-标签")
    private String introLabel;

    @ApiModelProperty("商家标识")
    private Long merchantId;

    @ApiModelProperty("组织标识")
    private Long orgId;

    @ApiModelProperty("护理员申请sku,多个逗号隔开")
    private String skuId;

    @ApiModelProperty("护理员排班数据,多个逗号隔开")
    private String schedule;

    /**
     * 逻辑删除
     */
    private Integer del;

    /**
     * 创建人
     */
    private String createBy;

    /**
     * 创建时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime createTime;

    /**
     * 更新者
     */
    private String updateBy;

    /**
     * 更新时间
     */
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime updateTime;
}

  1. 分页返回值EsPageVO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

/**
 * @author gideon
 * @date 2022/9/5
 */
@Data
public class EsPageVO<T> {

    @ApiModelProperty("总页数")
    private Integer pages;

    @ApiModelProperty("总条目数")
    private Long total;

    @ApiModelProperty("结果集")
    private List<T> list;
}

到了这里,关于spring boot +springboot集成es7.9.1+canal同步到es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Canal实时同步MySQL数据到ES

    canal主要用途是对MySQL数据库增量日志进行解析,提供增量数据的订阅和消费,简单说就是可以对MySQL的增量数据进行实时同步,支持同步到MySQL、Elasticsearch、HBase等数据存储中去。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业

    2024年02月04日
    浏览(51)
  • canal同步mysql数据到es中

    项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表和读写分离,每张表数据控制在500W一下,但是效率还是达不到要求,为了提高查询效率,我们使用ES查询。 而将mysql实时同步到es中保证数据一致性就成了我们的工作之下。 jdk1.8(依赖jdk环境,需要先

    2023年04月08日
    浏览(38)
  • canal实现mysql数据实时同步到es

    最近有一个需求:原有一些mysql数据,这些数据量很大,且包含文本信息,需要对其进行搜索,这时如果使用mysql的like来匹配,效率会很低,且很可能影响整个系统的运行,经过和同事的讨论,最终决定使用es来做搜索。 但是源数据有很多关联关系,搜索的时候也会带上这些

    2024年02月16日
    浏览(44)
  • 利用Canal把MySQL数据同步到ES

    Canal是阿里巴巴开源的一个数据库变更数据同步工具,主要用于 MySQL 数据库的增量数据到下游的同步,例如同步到 Elasticsearch、HBase、Hive 等。下面是一个基本的步骤来导入 MySQL 数据库到 Elasticsearch。 安装和配置 Canal 首先,需要在你的机器上安装并配置Canal。具体步骤可在 C

    2024年02月16日
    浏览(34)
  • 实时同步ES技术选型:Mysql+Canal+Adapter+ES+Kibana

    基于之前的文章,精简操作而来 让ELK在同一个docker网络下通过名字直接访问 Ubuntu服务器ELK部署与实践 使用 Docker 部署 canal 服务实现MySQL和ES实时同步 Docker部署ES服务,canal全量同步的时候内存爆炸,ES/Canal Adapter自动关闭,CPU100% 2.1 新建mysql docker 首先新建数据库的docker镜像

    2024年02月11日
    浏览(32)
  • 使用canal+rocketmq实现将mysql数据同步到es

    实际开发过程中,经常遇到数据库与缓存不一致的问题,造成这种问题的原因有很多,其中缓存数据没有及时更新、缓存中过期的数据没有及时更新,导致缓存中存在失效数据,导致数据库与缓存不一致。而这种问题的出现大部分都是因为同步延迟、缓存失效、过期和错误使

    2024年02月11日
    浏览(31)
  • Canal —— 一款 MySql 实时同步到 ES 的阿里开源神器

    目录 一. 前言 二. Canal 简介和使用场景 2.1. Canal 简介 2.2. Canal 使用场景 三. Canal Server 设计 3.1. 整体设计 3.2. EventParser 设计 3.3. CanalLogPositionManager 设计 3.4. CanalHAController 类图设计 3.5. EventSink 类图设计和扩展 3.6. EventStore 类图设计和扩展 3.7. MetaManager 类图设计和扩展 四. Can

    2024年01月25日
    浏览(39)
  • MySQL如何实时同步数据到ES?试试阿里开源的Canal

    前几天在网上冲浪的时候发现了一个比较成熟的开源中间件——  Canal  。在了解了它的工作原理和使用场景后,顿时产生了浓厚的兴趣。今天,就让我们跟随我的脚步,一起来揭开它神秘的面纱吧。 目录 前言 简介  工作原理  MySQL主备复制原理 canal 工作原理 Canal架构  C

    2024年02月20日
    浏览(33)
  • 使用 Docker 部署 canal 服务实现MySQL和ES实时同步

    参考 ClientAdapter: Canal的Adapter配置项目 Sync ES:Canal的Adapter中ES同步的配置项 使用 Docker 部署 canal 服务 docker canal-server canal-adapter mysql Canal(基于Docker同步mysql数据到elasticsearch) Canal部署过程中的错误 Canal 1.1.4 Canal Adapter 1.1.4 Kibana: 6.8.8 ElasticSearch: 6.4.3 由于Canal 1.1.4只能适配 Ela

    2024年02月13日
    浏览(36)
  • 使用canal同步MySQL5.7到ES中小白配置教程

    在本篇博客中,我们将深入探讨如何使用Canal进行MySQL到Elasticsearch (ES) 的数据同步。本文将涵盖Canal的基本概念、安装过程、配置步骤以及具体的同步操作,旨在帮助开发者和数据工程师理解并实现实时数据处理。包括:Canal介绍、MySQL同步、Elasticsearch配置、实时数据同

    2024年04月24日
    浏览(35)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包