未经许可,请勿转载。
前言
- 其实大部分的代码是来源于参考资料来源 的主要代码实现,我只是在他的基础上增加自定义注解,自定义分词器等。需要看详细源码的可以去看主要代码实现,结合我的来使用。
- 有人会问为什么需要自定义注解,因为elasticsearch7.6 索引将去除type 没有类型的概念了。所以我自己自定义数据类型,有需要的可以自己拓展自己需要的类型。
- 我这里主要写的是代码实现,没有涉及到中间件的搭建,因为真的没有时间,哈哈。
参考资料来源
主要实现代码:https://gitee.com/gz-yami/mall4cloud?_from=gitee_search
自定义注解:https://cloud.tencent.com/developer/article/1911164
自定义分词器:https://blog.csdn.net/m0_57302315/article/details/121103241
Canal胶水层:https://gitee.com/throwableDoge/canal-glue
rocketmq
- mq maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
- mq 适配器
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* @author gideon
*/
@Configuration
@RequiredArgsConstructor
public class RocketMqAdapter {
private final RocketMQMessageConverter rocketMqMessageConverter;
@Value("${rocketmq.name-server:}")
private String nameServer;
public RocketMQTemplate getTemplateByTopicName(String topic){
RocketMQTemplate mqTemplate = new RocketMQTemplate();
DefaultMQProducer producer = new DefaultMQProducer(topic);
producer.setNamesrvAddr(nameServer);
producer.setRetryTimesWhenSendFailed(2);
producer.setSendMsgTimeout((int) RocketMqConstant.TIMEOUT);
mqTemplate.setProducer(producer);
mqTemplate.setMessageConverter(rocketMqMessageConverter.getMessageConverter());
return mqTemplate;
}
}
- mq的一些常量信息RocketMqConstant
/**
* nameserver用;分割
* 同步消息,如果两次
*/
public class RocketMqConstant {
// 延迟消息 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (1-18)
/**
* 自动收货时间,实际上7天
*/
public static final int ORDER_AUTO_RECEIPT_DELAY_LEVEL = 60 * 24 * 7;
/**
* 默认发送消息超时时间
*/
public static final long TIMEOUT = 3000;
/**
* 订单取消退款
*/
public static final String ORDER_REFUND_TOPIC = "order-refund-topic";
/**
* 订单自动收货
*/
public static final String AUTO_RECEIPT_TOPIC = "auto-receipt-topic";
/**
* 服务订单订单支付成功
*/
public static final String ORDER_NOTIFY_TOPIC = "order-notify-topic";
/**
* canal-topic
*/
public static final String CANAL_TOPIC = "canal-topic";
}
- mq的配置类
import com.onecode.dtg.basic.RocketMqAdapter;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
/**
* @author gideon
*/
@Configuration
@RequiredArgsConstructor
public class RocketMqConfig {
private final RocketMqAdapter rocketMqAdapter;
@Lazy
@Bean(destroyMethod = "destroy")
public RocketMQTemplate autoReceiptTemplate() {
return rocketMqAdapter.getTemplateByTopicName(RocketMqConstant.AUTO_RECEIPT_TOPIC);
}
}
- mq 的配置文件信息
rocketmq:
name-server: 127.0.0.1:9876
elasticsearch
elasticsearch的搭建我在这里就不不多bb了,你们自行百度,下面是资料。
- maven所需依赖
</properties>
<elasticsearch.version>7.9.1</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch.version}</version>
<exclusions>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
- 分词所需资料 elasticsearch搭建的资料,点击这里。
- elasticsearch的yml
# elastic的地址
elastic:
hostname: 127.0.0.1
port: 9200
- elasticsearch 启动配置类
import lombok.RequiredArgsConstructor;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author gideon
*/
@Configuration
@RequiredArgsConstructor
public class ElasticConfig {
@Value("${elastic.hostname}")
private String hostname;
@Value("${elastic.port}")
private int port;
@Bean
public RestHighLevelClient restHighLevelClient() {
return new RestHighLevelClient(
RestClient.builder(
new HttpHost(hostname, port)));
}
}
- elasticsearch 自定义注解,AnalyzerType在下面。
import java.lang.annotation.*;
/**
* @author gideon
* @date 2022/9/8
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsField {
FieldType type() default FieldType.TEXT;
/**
* 指定分词器
*
* @return AnalyzerType
*/
AnalyzerType analyzer() default AnalyzerType.STANDARD;
}
- elasticsearch 自定义AnalyzerType,因为我自己的业务需要所以我加了一个自定义分词器comma
import lombok.Getter;
/**
* @author gideon
* @date 2022/9/8
*/
@Getter
public enum AnalyzerType {
/**
* 不使用分词
*/
NO("不使用分词"),
/**
* 标准分词,默认分词器
*/
STANDARD("standard"),
/**
* ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
*/
IK_SMART("ik_smart"),
/**
* ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
*/
IK_MAX_WORD("ik_max_word"),
/**
* ik_max_word :会将文本做逗号分词
*/
COMMA("comma"),
;
private final String type;
AnalyzerType(String type) {
this.type = type;
}
}
- elasticsearch 自定义FieldType
import lombok.Getter;
/**
* @author gideon
* @date 2022/9/8
*/
@Getter
public enum FieldType {
/**
*
*/
TEXT("text"),
KEYWORD("keyword"),
INTEGER("integer"),
DOUBLE("double"),
DATE("date"),
LONG("long"),
/**
* 单条数据
*/
OBJECT("object"),
/**
* 嵌套数组
*/
NESTED("nested"),
;
FieldType(String type){
this.type = type;
}
private final String type;
}
- elasticsearch索引名称枚举
/**
* es当中的index
*
* @author gideon
*/
public enum EsIndexEnum {
/**
* 护理员
*/
SERVER("server"),
;
private final String value;
public String value() {
return value;
}
EsIndexEnum(String value) {
this.value = value;
}
}
- elasticsearch 创建索引代码EsIndexCreateService,CommonBizException是我自定义的异常,你们可以使用自己自定义的异常类。
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.lang.reflect.Field;
/**
* @author gideon
* @date 2022/9/8
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EsIndexCreateService {
private final RestHighLevelClient restHighLevelClient;
/**
* 不需要逗号分词器索引
*
* @param indexName 索引名称
* @param clazz 同步到es的实体类
* @return boolean
*/
public boolean createIndex(String indexName, Class<?> clazz) {
return createIndex(indexName, clazz, false);
}
/**
* 建立索引
*
* @param indexName 索引名称
* @param comma 是否需要逗号分词器
* @return boolean
*/
public boolean createIndex(String indexName, Class<?> clazz, Boolean comma) {
try {
// 判断索引是否存在
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
if (exists) {
return true;
}
CreateIndexRequest request = new CreateIndexRequest(indexName);
if (comma) {
XContentBuilder settingsBuilder = XContentFactory.jsonBuilder()
.startObject()
.startObject("analysis")
.startObject("analyzer")
.startObject("comma")
.field("type", "pattern")
// 将分词器规则定义为按照","进行分词
.field("pattern", ",")
.endObject()
.endObject()
.endObject()
.endObject();
request.settings(settingsBuilder);
}
// 这里创建索引结构
request.mapping(generateBuilder(clazz));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
// 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
boolean shardsAcknowledged = response.isShardsAcknowledged();
if (acknowledged || shardsAcknowledged) {
log.info("创建索引成功!索引名称为{}", indexName);
return true;
}
return false;
} catch (IOException e) {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + indexName + "失败。");
}
}
/**
* 生成es索引
*
* @param clazz 对于的es实体
* @return XContentBuilder
*/
public static XContentBuilder generateBuilder(Class<?> clazz) {
try {
// 获取索引名称及类型
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.startObject("properties");
Field[] declaredFields = clazz.getDeclaredFields();
for (Field declaredField : declaredFields) {
if (declaredField.isAnnotationPresent(EsField.class)) {
// 获取注解
EsField declaredAnnotation = declaredField.getDeclaredAnnotation(EsField.class);
// 如果嵌套对象
/**
* {
* "mappings": {
* "properties": {
* "region": {
* "type": "keyword"
* },
* "manager": {
* "properties": {
* "age": { "type": "integer" },
* "name": {
* "properties": {
* "first": { "type": "text" },
* "last": { "type": "text" }
* }
* }
* }
* }
* }
* }
* }
*/
if (declaredAnnotation.type() == FieldType.OBJECT) {
// 获取当前类的对象-- Action
Class<?> type = declaredField.getType();
Field[] typeDeclaredFields = type.getDeclaredFields();
builder.startObject(declaredField.getName());
builder.startObject("properties");
// 遍历该对象中的所有属性
for (Field field : typeDeclaredFields) {
if (field.isAnnotationPresent(EsField.class)) {
// 获取注解
EsField fieldDeclaredAnnotation = field.getDeclaredAnnotation(EsField.class);
builder.startObject(field.getName());
builder.field("type", fieldDeclaredAnnotation.type().getType());
// keyword不需要分词
if (fieldDeclaredAnnotation.type() == FieldType.TEXT) {
builder.field("analyzer", fieldDeclaredAnnotation.analyzer().getType());
}
builder.endObject();
}
}
builder.endObject();
builder.endObject();
} else {
builder.startObject(declaredField.getName());
builder.field("type", declaredAnnotation.type().getType());
// keyword不需要分词
if (declaredAnnotation.type() == FieldType.TEXT) {
builder.field("analyzer", declaredAnnotation.analyzer().getType());
}
builder.endObject();
}
}
}
// 对应property
builder.endObject();
builder.endObject();
return builder;
} catch (IOException e) {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引失败。");
}
}
}
canal
因为我这里使用的是第三方的canal jar包,也就是上面说到的Canal胶水层
获取地址:https://gitee.com/gz-yami/mall4cloud/tree/master/mall4cloud-common/mall4cloud-common-core/lib
引入的maven文章来源:https://www.toymoban.com/news/detail-410033.html
<dependency>
<groupId>cn.throwx</groupId>
<artifactId>canal-glue-core</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/canal-glue-core.jar</systemPath>
</dependency>
类似于下面这样放文章来源地址https://www.toymoban.com/news/detail-410033.html
- canal的canal.properties配置文件信息,这里主要看你使用什么信息队列就配置什么。我使用的是RocketMQ,然后需要创建一个topic去监听数据库的操作日志,配置topic在rocketmq.producer.group = canal-topic
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
######### destinations #############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = canal-topic
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.1.46:9876
rocketmq.retry.times.when.send.failed = 3
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
- canal的instance.properties配置文件信息,canal.instance.filter.regex这个参数可以指定监听的数据库->表
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=192.168.1.46:3306
canal.instance.master.journal.name=mysql-binlog.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=mp_biz_service.server:*,mp_biz_service.shop_service_server:*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
- canal自定义处理器NdCanalBinLogEventParser
import cn.throwx.canal.gule.common.BinLogEventType;
import cn.throwx.canal.gule.common.OperationType;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.parser.BaseCommonEntryFunction;
import cn.throwx.canal.gule.support.parser.BasePrimaryKeyTupleFunction;
import cn.throwx.canal.gule.support.parser.CanalBinLogEventParser;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
/**
* @author gideon
*/
@Slf4j
public class NdCanalBinLogEventParser implements CanalBinLogEventParser {
@Override
public <T> List<CanalBinLogResult<T>> parse(CanalBinLogEvent event, Class<T> klass, BasePrimaryKeyTupleFunction primaryKeyFunction, BaseCommonEntryFunction<T> commonEntryFunction) {
BinLogEventType eventType = BinLogEventType.fromType(event.getType());
if (Objects.equals(BinLogEventType.CREATE, eventType) || Objects.equals(BinLogEventType.ALTER, eventType)) {
if (log.isDebugEnabled()) {
log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
}
return Collections.emptyList();
}
if (BinLogEventType.UNKNOWN != eventType && BinLogEventType.QUERY != eventType) {
if (Boolean.TRUE.equals(event.getIsDdl())) {
CanalBinLogResult<T> entry = new CanalBinLogResult<>();
entry.setOperationType(OperationType.DDL);
entry.setBinLogEventType(eventType);
entry.setDatabaseName(event.getDatabase());
entry.setTableName(event.getTable());
entry.setSql(event.getSql());
return Collections.singletonList(entry);
} else {
Optional.ofNullable(event.getPkNames()).filter((x) -> x.size() == 1).orElseThrow(() -> new IllegalArgumentException("DML类型binlog事件主键列数量不为1"));
String primaryKeyName = event.getPkNames().get(0);
List<CanalBinLogResult<T>> entryList = new LinkedList<>();
List<Map<String, String>> data = event.getData();
List<Map<String, String>> old = event.getOld();
int dataSize = null != data ? data.size() : 0;
int oldSize = null != old ? old.size() : 0;
if (dataSize > 0) {
for(int index = 0; index < dataSize; ++index) {
CanalBinLogResult<T> entry = new CanalBinLogResult<>();
entryList.add(entry);
entry.setSql(event.getSql());
entry.setOperationType(OperationType.DML);
entry.setBinLogEventType(eventType);
entry.setTableName(event.getTable());
entry.setDatabaseName(event.getDatabase());
Map<String, String> item = data.get(index);
entry.setAfterData(commonEntryFunction.apply(item));
Map<String, String> oldItem = null;
if (oldSize > 0 && index <= oldSize) {
oldItem = old.get(index);
entry.setBeforeData(commonEntryFunction.apply(oldItem));
}
entry.setPrimaryKey(primaryKeyFunction.apply(oldItem, item, primaryKeyName));
}
}
return entryList;
}
} else {
if (log.isDebugEnabled()) {
log.debug("监听到不需要处理或者未知的binlog事件类型[{}],将忽略解析过程返回空列表,binlog事件:{}", eventType, JSON.toJSONString(event));
}
return Collections.emptyList();
}
}
private NdCanalBinLogEventParser() {
}
public static NdCanalBinLogEventParser of() {
return new NdCanalBinLogEventParser();
}
}
- canal自定义处理器NdCanalBinlogEventProcessorFactory
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author gideon
*/
public class NdCanalBinlogEventProcessorFactory implements CanalBinlogEventProcessorFactory {
private final ConcurrentMap<ModelTable, List<BaseCanalBinlogEventProcessor<?>>> cache = new ConcurrentHashMap<>(16);
@Override
public void register(ModelTable modelTable, BaseCanalBinlogEventProcessor<?> processor) {
synchronized(this.cache) {
this.cache.putIfAbsent(modelTable, new LinkedList<>());
this.cache.get(modelTable).add(processor);
}
}
@Override
public List<BaseCanalBinlogEventProcessor<?>> get(ModelTable modelTable) {
return this.cache.get(modelTable);
}
private NdCanalBinlogEventProcessorFactory() {
}
public static NdCanalBinlogEventProcessorFactory of() {
return new NdCanalBinlogEventProcessorFactory();
}
}
- canal自定义处理器NdCanalGlue
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.ModelTable;
import cn.throwx.canal.gule.support.adapter.SourceAdapterFacade;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import java.util.List;
/**
* @author gideon
*/
public class NdCanalGlue implements CanalGlue {
private final CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory;
@Override
public void process(String content) {
CanalBinLogEvent event = SourceAdapterFacade.X.adapt(CanalBinLogEvent.class, content);
ModelTable modelTable = ModelTable.of(event.getDatabase(), event.getTable());
List<BaseCanalBinlogEventProcessor<?>> baseCanalBinlogEventProcessors = this.canalBinlogEventProcessorFactory.get(modelTable);
if (baseCanalBinlogEventProcessors.isEmpty()) {
return;
}
baseCanalBinlogEventProcessors.forEach((processor) -> processor.process(event));
}
private NdCanalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
this.canalBinlogEventProcessorFactory = canalBinlogEventProcessorFactory;
}
public static NdCanalGlue of(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
return new NdCanalGlue(canalBinlogEventProcessorFactory);
}
}
- canal配置类
import cn.throwx.canal.gule.CanalGlue;
import cn.throwx.canal.gule.support.parser.*;
import cn.throwx.canal.gule.support.parser.converter.CanalFieldConverterFactory;
import cn.throwx.canal.gule.support.parser.converter.InMemoryCanalFieldConverterFactory;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.CanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalBinLogEventParser;
import com.onecode.middle.search.service.canal.NdCanalBinlogEventProcessorFactory;
import com.onecode.middle.search.service.canal.NdCanalGlue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import java.util.Map;
/**
* @author gideon
*/
@Configuration
public class CanalGlueAutoConfiguration implements SmartInitializingSingleton, BeanFactoryAware {
private ConfigurableListableBeanFactory configurableListableBeanFactory;
@Bean
@ConditionalOnMissingBean
public CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory() {
return NdCanalBinlogEventProcessorFactory.of();
}
@Bean
@ConditionalOnMissingBean
public ModelTableMetadataManager modelTableMetadataManager(CanalFieldConverterFactory canalFieldConverterFactory) {
return InMemoryModelTableMetadataManager.of(canalFieldConverterFactory);
}
@Bean
@ConditionalOnMissingBean
public CanalFieldConverterFactory canalFieldConverterFactory() {
return InMemoryCanalFieldConverterFactory.of();
}
@Bean
@ConditionalOnMissingBean
public CanalBinLogEventParser canalBinLogEventParser() {
return NdCanalBinLogEventParser.of();
}
@Bean
@ConditionalOnMissingBean
public ParseResultInterceptorManager parseResultInterceptorManager(ModelTableMetadataManager modelTableMetadataManager) {
return InMemoryParseResultInterceptorManager.of(modelTableMetadataManager);
}
@Bean
@Primary
public CanalGlue canalGlue(CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory) {
return NdCanalGlue.of(canalBinlogEventProcessorFactory);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void afterSingletonsInstantiated() {
ParseResultInterceptorManager parseResultInterceptorManager
= configurableListableBeanFactory.getBean(ParseResultInterceptorManager.class);
ModelTableMetadataManager modelTableMetadataManager
= configurableListableBeanFactory.getBean(ModelTableMetadataManager.class);
CanalBinlogEventProcessorFactory canalBinlogEventProcessorFactory
= configurableListableBeanFactory.getBean(CanalBinlogEventProcessorFactory.class);
CanalBinLogEventParser canalBinLogEventParser
= configurableListableBeanFactory.getBean(CanalBinLogEventParser.class);
Map<String, BaseParseResultInterceptor> interceptors
= configurableListableBeanFactory.getBeansOfType(BaseParseResultInterceptor.class);
interceptors.forEach((k, interceptor) -> parseResultInterceptorManager.registerParseResultInterceptor(interceptor));
Map<String, BaseCanalBinlogEventProcessor> processors
= configurableListableBeanFactory.getBeansOfType(BaseCanalBinlogEventProcessor.class);
processors.forEach((k, processor) -> processor.init(canalBinLogEventParser, modelTableMetadataManager,
canalBinlogEventProcessorFactory, parseResultInterceptorManager));
}
}
- ServerBO canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表,因为这个数据是需要同步到es的所以设置了类型。
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.onecode.dtg.basic.common.es.annotation.EsField;
import com.onecode.dtg.basic.common.es.enums.AnalyzerType;
import com.onecode.dtg.basic.common.es.enums.FieldType;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* @author gideon
*/
@Data
@CanalModel(database = "mp_biz_service", table = "server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ServerBO {
@ApiModelProperty("id")
@EsField(type = FieldType.LONG)
private Long id;
@ApiModelProperty("用户标识")
@EsField(type = FieldType.LONG)
private Long userId;
@ApiModelProperty("类型")
@EsField(type = FieldType.KEYWORD)
private String type;
@ApiModelProperty("姓名")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String name;
@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
@EsField(type = FieldType.KEYWORD)
private String gender;
@ApiModelProperty("出生年月日")
@JsonFormat(pattern = "yyyy-MM-dd")
@EsField(type = FieldType.DATE)
private LocalDate birthday;
@ApiModelProperty("学历")
@EsField(type = FieldType.KEYWORD)
private String education;
@ApiModelProperty("从业时间")
@JsonFormat(pattern = "yyyy-MM-dd")
@EsField(type = FieldType.DATE)
private LocalDate practiceDate;
@ApiModelProperty("评级")
@EsField(type = FieldType.KEYWORD)
private String level;
@ApiModelProperty("认证标签")
@EsField(type = FieldType.TEXT)
private String authLabel;
@ApiModelProperty("勋章(逗号隔开)")
@EsField(type = FieldType.TEXT)
private String medal;
@ApiModelProperty("服务评分")
@EsField(type = FieldType.INTEGER)
private Integer serviceScore;
@ApiModelProperty("已实名认证")
@EsField(type = FieldType.INTEGER)
private Integer realNameAuth;
@ApiModelProperty("身份证号")
@EsField(type = FieldType.TEXT)
private String idCardNo;
@ApiModelProperty("户籍-省")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String idCardProvince;
@ApiModelProperty("户籍-市")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String idCardCity;
@ApiModelProperty("户籍-区")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String idCardRegion;
@ApiModelProperty("手机号")
@EsField(type = FieldType.TEXT)
private String phone;
@ApiModelProperty("现住-省")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String presentProvince;
@ApiModelProperty("现住-市")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String presentCity;
@ApiModelProperty("现住-区")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String presentRegion;
@ApiModelProperty("现住-地址")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String presentAddress;
@ApiModelProperty("头像")
@EsField(type = FieldType.TEXT)
private String head;
@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
@EsField(type = FieldType.KEYWORD)
private String useStatus;
@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
@EsField(type = FieldType.KEYWORD)
private String auditStatus;
@ApiModelProperty("驳回理由")
@EsField(type = FieldType.TEXT)
private String rejectReason;
@ApiModelProperty("注册时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@EsField(type = FieldType.DATE)
private LocalDateTime regDate;
@ApiModelProperty("介绍-内容")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String introContent;
@ApiModelProperty("介绍-视频")
@EsField(type = FieldType.TEXT)
private String introVideo;
@ApiModelProperty("介绍-标签")
@EsField(type = FieldType.TEXT)
private String introLabel;
@ApiModelProperty("商家标识")
@EsField(type = FieldType.LONG)
private Long merchantId;
@ApiModelProperty("组织标识")
@EsField(type = FieldType.LONG)
private Long orgId;
@ApiModelProperty("护理员申请sku,多个逗号隔开")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
private String skuId;
@ApiModelProperty("护理员排班数据,多个逗号隔开")
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.COMMA)
private String schedule;
/**
* 逻辑删除
*/
@EsField(type = FieldType.INTEGER)
private Integer del;
/**
* 创建人
*/
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String createBy;
/**
* 创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@EsField(type = FieldType.DATE)
private LocalDateTime createTime;
/**
* 更新者
*/
@EsField(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
private String updateBy;
/**
* 更新时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@EsField(type = FieldType.DATE)
private LocalDateTime updateTime;
}
- ShopServiceServer canal转换的实体类,@CanalModel 的参数database是对应的数据库,table是对应数据库下的数据表
import cn.throwx.canal.gule.annotation.CanalModel;
import cn.throwx.canal.gule.common.FieldNamingPolicy;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.time.LocalDateTime;
/**
* @author gideon
*/
@Data
@CanalModel(database = "mp_biz_service", table = "shop_service_server", fieldNamingPolicy = FieldNamingPolicy.LOWER_UNDERSCORE)
public class ShopServiceServerBO {
@ApiModelProperty("ID")
private Long id;
@ApiModelProperty("产品标识")
private Long productId;
@ApiModelProperty("服务者用户标识")
private Long serverUserId;
@ApiModelProperty("产品sku标识")
private Long productSkuId;
@ApiModelProperty(value = "盈利")
private Integer profit;
@ApiModelProperty("商家标识")
private Long merchantId;
@ApiModelProperty("组织标识")
private Long orgId;
/**
* 逻辑删除
*/
private Integer del;
/**
* 创建人
*/
private String createBy;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新者
*/
private String updateBy;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}
- 至此canal的基础代码就完成。
消费MQ订阅的canal信息,进行elasticsearch的同步以及搜索
- 监听我们上面canal-topic订阅的消息然后进行同步数据CanalListener
import cn.throwx.canal.gule.CanalGlue;
import com.onecode.dtg.basic.RocketMqConstant;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* canal消费数据库操作日志mq
*
* @author gideon
*/
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC, consumerGroup = RocketMqConstant.CANAL_TOPIC)
public class CanalListener implements RocketMQListener<String> {
private final CanalGlue canalGlue;
@Override
public void onMessage(String message) {
canalGlue.process(message);
}
}
- 对我们需要监听的表进行处理ServerCanalListener,这里面的hutool是一个工具类有需要的可以自行引入
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.6</version>
</dependency>
import cn.hutool.json.JSONUtil;
import cn.throwx.canal.gule.model.CanalBinLogEvent;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import cn.throwx.canal.gule.support.processor.ExceptionHandler;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.util.EsIndexCreateService;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author gideon
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerCanalListener extends BaseCanalBinlogEventProcessor<ServerBO> {
private final EsIndexCreateService esIndexCreateService;
private final ServerFeignClient serverFeignClient;
private final RestHighLevelClient restHighLevelClient;
/**
* 插入护理员,此时插入es
*/
@Override
protected void processInsertInternal(CanalBinLogResult<ServerBO> result) {
Long serverId = result.getPrimaryKey();
EsServerBO esServerBO = serverFeignClient.loadEsServerBO(serverId);
if (esServerBO == null) {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常");
}
// 创建索引
boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
if (!index) {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
}
IndexRequest request = new IndexRequest(EsIndexEnum.SERVER.value());
request.id(String.valueOf(serverId));
request.source(JSONUtil.toJsonStr(esServerBO), XContentType.JSON);
try {
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
log.info(indexResponse.toString());
} catch (IOException e) {
log.error(e.toString());
throw new CommonBizException(ResultCode.FAIL.getModel(), "保存es信息异常:" + e);
}
}
/**
* 更新护理员,删除护理员索引,再重新构建一个
*/
@Override
protected void processUpdateInternal(CanalBinLogResult<ServerBO> result) {
Long spuId = result.getPrimaryKey();
EsServerBO esServerBO = serverFeignClient.loadEsServerBO(spuId);
String source = JSONUtil.toJsonStr(esServerBO);
// 创建索引
boolean index = esIndexCreateService.createIndex(EsIndexEnum.SERVER.value(), ServerBO.class, true);
if (!index) {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引:" + EsIndexEnum.SERVER.value() + "失败。");
}
UpdateRequest request = new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(spuId));
request.doc(source, XContentType.JSON);
request.docAsUpsert(true);
try {
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
log.info(updateResponse.toString());
} catch (IOException e) {
log.error(e.toString());
throw new CommonBizException(ResultCode.FAIL.getModel(), "删除es信息异常:" + e);
}
}
@Override
protected ExceptionHandler exceptionHandler() {
return (CanalBinLogEvent event, Throwable throwable) -> {
throw new CommonBizException(ResultCode.FAIL.getModel(), "创建索引异常:" + throwable);
};
}
}
- 这个表的监听,是因为我的业务需求,shop_service_server表增加或者删除的时候需要将skuId加到server表的skuId字段里面去,所以需要监听修改。ShopServiceServerCanalListener
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.throwx.canal.gule.model.CanalBinLogResult;
import cn.throwx.canal.gule.support.processor.BaseCanalBinlogEventProcessor;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.bo.ShopServiceServerBO;
import com.onecode.middle.search.service.manager.ServerUpdateManager;
import com.onecode.service.feign.ServerFeignClient;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author gideon
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShopServiceServerCanalListener extends BaseCanalBinlogEventProcessor<ShopServiceServerBO> {
private final ServerFeignClient serverFeignClient;
private final ServerUpdateManager serverUpdateManager;
/**
* 新增商品服务者数据
*
* @param result result
*/
@Override
protected void processInsertInternal(CanalBinLogResult<ShopServiceServerBO> result) {
//数据库操作后的数据
ShopServiceServerBO afterData = result.getAfterData();
EsServerBO loadServerBO = loadServerBO(afterData.getServerUserId());
List<String> skuIdList = StrUtil.split(loadServerBO.getSkuId(), ",");
skuIdList.add(afterData.getProductSkuId().toString());
EsServerBO esServerBO = new EsServerBO();
esServerBO.setSkuId(StrUtil.join(",", skuIdList));
serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
}
/**
* 更新商品服务者数据
*
* @param result result
*/
@Override
protected void processUpdateInternal(CanalBinLogResult<ShopServiceServerBO> result) {
//数据库执行操作后的数据
ShopServiceServerBO afterData = result.getAfterData();
//del字段是我的表是否逻辑删除的判断,大家根据自己需要去掉
if ("1".equals(afterData.getDel())) {
return;
}
//微服务项目调用接口查询数据
EsServerBO loadEsServerBO = loadServerBO(afterData.getServerUserId());
//处理修改后的数据
EsServerBO esServerBO = dealWithData(afterData, loadEsServerBO);
serverUpdateManager.esUpdateServerByServerId(loadEsServerBO.getId(), esServerBO);
}
/**
* 删除商品服务者数据
*
* @param result result
*/
@Override
protected void processDeleteInternal(CanalBinLogResult<ShopServiceServerBO> result) {
//数据库操作前的数据
ShopServiceServerBO beforeData = result.getBeforeData();
EsServerBO loadServerBO = loadServerBO(beforeData.getServerUserId());
EsServerBO esServerBO = dealWithData(beforeData, loadServerBO);
serverUpdateManager.esUpdateServerByServerId(loadServerBO.getId(), esServerBO);
}
/**
* 处理数据
*
* @param data 数据库操作数据
* @return EsServerBO
*/
private EsServerBO dealWithData(ShopServiceServerBO data, EsServerBO loadEsServerBO) {
List<String> skuIdList = StrUtil.split(loadEsServerBO.getSkuId(), ",");
CollUtil.removeAny(skuIdList, data.getProductSkuId().toString());
EsServerBO esServerBO = new EsServerBO();
esServerBO.setSkuId(StrUtil.join(",", skuIdList));
return esServerBO;
}
/**
* 获取护理员书信息
*
* @param serverUserId 护理员用户标识
* @return EsServerBO
*/
private EsServerBO loadServerBO(Long serverUserId) {
EsServerBO loadEsServerBO = serverFeignClient.loadEsServerBoByServerUserId(serverUserId);
if (loadEsServerBO == null) {
throw new CommonBizException(ResultCode.FAIL.getModel(),
"es数据同步失败:无法通过护工用户标识:" + serverUserId + "找到护理员信息。");
}
return loadEsServerBO;
}
}
- ServerUpdateManager,这个是ShopServiceServerListener的处理实现类
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.service.feign.bo.EsServerBO;
import lombok.RequiredArgsConstructor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
/**
* @author gideon
*/
@Component
@RequiredArgsConstructor
public class ServerUpdateManager {
private final RestHighLevelClient restHighLevelClient;
/**
* 批量更新es中的商品信息
*
* @param serverId 护理员标识
* @param esServerBO 更新的数据
*/
public void esUpdateServerByServerId(Long serverId, EsServerBO esServerBO) {
String source = JSONUtil.toJsonStr(esServerBO);
try {
BulkRequest request = new BulkRequest();
// 准备更新的数据
request.add(new UpdateRequest(EsIndexEnum.SERVER.value(), String.valueOf(serverId)).doc(source, XContentType.JSON));
//更新
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
throw new CommonBizException(ResultCode.FAIL.getModel(), bulkResponse.buildFailureMessage());
}
} catch (Exception e) {
throw new CommonBizException(ResultCode.FAIL.getModel(), e.getMessage());
}
}
}
- ServerSearchManager是搜索接口实现
import cn.hutool.json.JSONUtil;
import com.onecode.dtg.basic.common.core.exception.CommonBizException;
import com.onecode.dtg.basic.common.enums.ResultCode;
import com.onecode.dtg.basic.common.util.ColumnUtil;
import com.onecode.dtg.basic.common.util.LocalDateUtil;
import com.onecode.middle.search.service.bo.ServerBO;
import com.onecode.middle.search.service.constant.EsIndexEnum;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import com.onecode.service.feign.constant.AuditStatus;
import com.onecode.service.feign.constant.UseStatus;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* @author gideon
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ServerSearchManager {
private final RestHighLevelClient restHighLevelClient;
/**
* 通过搜索信息分页搜索es数据的信息
*
* @param serverSearchDTO 护理员搜索条件
* @return 搜索结果
*/
public EsPageVO<EsServerVO> pageSearchResult(ServerSearchDTO serverSearchDTO) {
//1、动态构建出查询需要的DSL语句
EsPageVO<EsServerVO> result;
//1、准备检索请求
SearchRequest searchRequest = buildSearchRequest(serverSearchDTO);
try {
//2、执行检索请求
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
log.info("搜索返回结果:" + response.toString());
//3、分析响应数据,封装成我们需要的格式
result = buildSearchResult(serverSearchDTO, response);
} catch (IOException e) {
log.error(e.toString());
throw new CommonBizException(ResultCode.FAIL.getModel(), "搜索服务出了点小差,请稍后再试:" + e);
}
return result;
}
/**
* 构建结果数据
*/
private EsPageVO<EsServerVO> buildSearchResult(ServerSearchDTO dto, SearchResponse response) {
EsPageVO<EsServerVO> esPageVO = new EsPageVO<>();
//1、返回的所有查询到的商品
SearchHits hits = response.getHits();
List<EsServerVO> productSearchs = getEsOrderBOList(response);
esPageVO.setList(productSearchs);
//===============分页信息====================//
//总记录数
long total = hits.getTotalHits().value;
esPageVO.setTotal(total);
// 总页码
int totalPages = (int) total % dto.getPageSize() == 0 ?
(int) total / dto.getPageSize() : ((int) total / dto.getPageSize() + 1);
esPageVO.setPages(totalPages);
return esPageVO;
}
private List<EsServerVO> getEsOrderBOList(SearchResponse response) {
return getOrderListByResponse(response.getHits().getHits());
}
/**
* 从es返回的数据中获取spu列表
*
* @param hits es返回的数据
* @return
*/
private List<EsServerVO> getOrderListByResponse(SearchHit[] hits) {
List<EsServerVO> esOrders = new ArrayList<>();
for (SearchHit hit : hits) {
EsServerVO esOrder = JSONUtil.toBean(hit.getSourceAsString(), EsServerVO.class);
esOrders.add(esOrder);
}
return esOrders;
}
/**
* 准备检索请求
*
* @param param 搜索参数
* @return
*/
private SearchRequest buildSearchRequest(ServerSearchDTO param) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 构建bool-query
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 过滤
filterQueryIfNecessary(param, boolQueryBuilder);
// 关键字搜索
keywordSearch(param, boolQueryBuilder);
// 排序
sort(searchSourceBuilder, boolQueryBuilder);
//分页
searchSourceBuilder.from((param.getPageNum() - 1) * param.getPageSize());
searchSourceBuilder.size(param.getPageSize());
log.info("构建的DSL语句 {}", searchSourceBuilder);
return new SearchRequest(new String[]{EsIndexEnum.SERVER.value()}, searchSourceBuilder);
}
/**
* 关键字搜索
*/
private void keywordSearch(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
BoolQueryBuilder keywordShouldQuery = QueryBuilders.boolQuery();
// 现住-省
if (Objects.nonNull(param.getPresentProvince())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentProvince), param.getPresentProvince()).operator(Operator.AND));
}
// 现住-市
if (Objects.nonNull(param.getPresentCity())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentCity), param.getPresentCity()).operator(Operator.AND));
}
// 现住-区
if (Objects.nonNull(param.getPresentRegion())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getPresentRegion), param.getPresentRegion()).operator(Operator.AND));
}
// 户籍-省
if (Objects.nonNull(param.getIdCardProvince())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardProvince), param.getIdCardProvince()).operator(Operator.AND));
}
// 户籍-市
if (Objects.nonNull(param.getIdCardCity())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardCity), param.getIdCardCity()).operator(Operator.AND));
}
// 户籍-区
if (Objects.nonNull(param.getIdCardRegion())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIdCardRegion), param.getIdCardRegion()).operator(Operator.AND));
}
// 标签
if (Objects.nonNull(param.getIntroLabels())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getIntroLabel), param.getIntroLabels()).operator(Operator.AND));
}
// 排班,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的,但是需要多个匹配,我就使用了for循环,应该是有优化的地方,暂时没处理
if (param.getServiceStartDate() != null && param.getServiceEndDate() != null) {
List<String> scheduleList = LocalDateUtil.getContinuousTime(param.getServiceStartDate(), param.getServiceEndDate(), DateTimeFormatter.ofPattern("yyyyMMdd"));
for (String schedule : scheduleList) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSchedule), schedule).operator(Operator.AND));
}
}
// skuId,使用了我的自定义逗号分词器,所以是根据逗号分隔后进行匹配的
if (Objects.nonNull(param.getSkuId())) {
keywordShouldQuery.should(QueryBuilders.matchQuery(ColumnUtil.getName(ServerBO::getSkuId), param.getSkuId()).operator(Operator.AND));
}
boolQueryBuilder.must(keywordShouldQuery);
}
/**
* 进行排序
*/
private void sort(SearchSourceBuilder searchSourceBuilder, BoolQueryBuilder boolQueryBuilder) {
searchSourceBuilder.sort(ColumnUtil.getName(ServerBO::getCreateTime), SortOrder.DESC);
searchSourceBuilder.query(boolQueryBuilder);
}
/**
* 过滤查询条件,如果有必要的话
*
* @param param 查询条件
* @param boolQueryBuilder 组合进boolQueryBuilder
*/
private void filterQueryIfNecessary(ServerSearchDTO param, BoolQueryBuilder boolQueryBuilder) {
// 类型
if (Objects.nonNull(param.getType())) {
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getType), param.getType()));
}
// 性别
if (Objects.nonNull(param.getGender())) {
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getGender), param.getGender()));
}
// 学历
if (Objects.nonNull(param.getEducation())) {
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getEducation), param.getEducation()));
}
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getAuditStatus), AuditStatus.PASS.getStatus()));
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getUseStatus), UseStatus.NORMAL.getStatus()));
boolQueryBuilder.filter(QueryBuilders.termQuery(ColumnUtil.getName(ServerBO::getDel), 0));
}
}
- ServerSearchController
import com.onecode.dtg.basic.common.model.ResultBean;
import com.onecode.middle.search.service.dto.ServerSearchDTO;
import com.onecode.middle.search.service.manager.ServerSearchManager;
import com.onecode.middle.search.service.vo.EsPageVO;
import com.onecode.middle.search.service.vo.search.EsServerVO;
import io.swagger.annotations.Api;
import lombok.AllArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
/**
* @author gideon
* @date 2022/9/6
*/
@Validated
@AllArgsConstructor
@RestController
@RequestMapping("/search/server/")
@Api(tags = "api-服务者搜索接口")
public class ServerSearchController {
private final ServerSearchManager serverSearchManager;
@PostMapping("/page")
public ResultBean<EsPageVO<EsServerVO>> page(@RequestBody ServerSearchDTO dto) {
return new ResultBean<>(serverSearchManager.pageSearchResult(dto));
}
}
- 分页参数EsPageDTO实体类
import com.onecode.dtg.basic.common.util.PrincipalUtil;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.Arrays;
/**
* @author gideon
*/
@Data
public class EsPageDTO {
public static final String ASC = "ASC";
public static final String DESC = "DESC";
/**
* 最大分页大小,如果分页大小大于500,则用500作为分页的大小。防止有人直接传入一个较大的数,导致服务器内存溢出宕机
*/
public static final Integer MAX_PAGE_SIZE = 500;
/**
* 当前页
*/
@NotNull(message = "pageNum 不能为空")
@ApiModelProperty(value = "当前页", required = true)
private Integer pageNum;
@NotNull(message = "pageSize 不能为空")
@ApiModelProperty(value = "每页大小", required = true)
private Integer pageSize;
@ApiModelProperty(value = "排序字段数组,用逗号分割")
private String[] columns;
@ApiModelProperty(value = "排序字段方式,用逗号分割,ASC正序,DESC倒序")
private String[] orders;
public Integer getPageNum() {
return pageNum;
}
public void setPageNum(Integer pageNum) {
this.pageNum = pageNum;
}
public Integer getPageSize() {
return pageSize;
}
public void setPageSize(Integer pageSize) {
if (pageSize > MAX_PAGE_SIZE) {
this.pageSize = MAX_PAGE_SIZE;
return;
}
this.pageSize = pageSize;
}
public String getOrderBy() {
return order(this.columns, this.orders);
}
public String[] getColumns() {
return columns;
}
public void setColumns(String[] columns) {
this.columns = columns;
}
public String[] getOrders() {
return orders;
}
public void setOrders(String[] orders) {
this.orders = orders;
}
public static String order(String[] columns, String[] orders) {
if (columns == null || columns.length == 0) {
return "";
}
StringBuilder stringBuilder = new StringBuilder();
for (int x = 0; x < columns.length; x++) {
String column = columns[x];
String order;
if (orders != null && orders.length > x) {
order = orders[x].toUpperCase();
if (!(order.equals(ASC) || order.equals(DESC))) {
throw new IllegalArgumentException("非法的排序策略:" + column);
}
} else {
order = ASC;
}
// 判断列名称的合法性,防止SQL注入。只能是【字母,数字,下划线】
if (PrincipalUtil.isField(column)) {
throw new IllegalArgumentException("非法的排序字段名称:" + column);
}
// 驼峰转换为下划线
column = humpConversionUnderscore(column);
if (x != 0) {
stringBuilder.append(", ");
}
stringBuilder.append("`").append(column).append("` ").append(order);
}
return stringBuilder.toString();
}
public static String humpConversionUnderscore(String value) {
StringBuilder stringBuilder = new StringBuilder();
char[] chars = value.toCharArray();
for (char character : chars) {
if (Character.isUpperCase(character)) {
stringBuilder.append("_");
character = Character.toLowerCase(character);
}
stringBuilder.append(character);
}
return stringBuilder.toString();
}
@Override
public String toString() {
return "EsPageDTO{" +
"pageNum=" + pageNum +
", pageSize=" + pageSize +
", columns=" + Arrays.toString(columns) +
", orders=" + Arrays.toString(orders) +
'}';
}
}
- 查询参数实体类ServerSearchDTO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
/**
* @author gideon
*/
@EqualsAndHashCode(callSuper = true)
@Data
public class ServerSearchDTO extends EsPageDTO{
@ApiModelProperty("类型")
private String type;
@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
private String gender;
@ApiModelProperty("学历")
private String education;
@ApiModelProperty("户籍-省")
private String idCardProvince;
@ApiModelProperty("户籍-市")
private String idCardCity;
@ApiModelProperty("户籍-区")
private String idCardRegion;
@ApiModelProperty("现住-省")
private String presentProvince;
@ApiModelProperty("现住-市")
private String presentCity;
@ApiModelProperty("现住-区")
private String presentRegion;
@ApiModelProperty("介绍-标签(多个值需要使用逗号分割)")
private String introLabels;
@ApiModelProperty("服务开始时间")
@NotNull(message = "服务开始时间不能为空")
private LocalDate serviceStartDate;
@ApiModelProperty("服务结束时间")
@NotNull(message = "服务结束时间不能为空")
private LocalDate serviceEndDate;
@ApiModelProperty("skuId")
@NotNull(message = "skuId不能为空。")
private Long skuId;
}
- 返回值EsServerVO参数
/**
* @author gideon
* @date 2022/9/5
*/
@Data
public class EsServerVO {
@ApiModelProperty("id")
private Long id;
@ApiModelProperty("用户标识")
private Long userId;
@ApiModelProperty("类型")
private String type;
@ApiModelProperty("姓名")
private String name;
@ApiModelProperty("值为1时是男性,值为2时是女性,值为0时是未知")
private String gender;
@ApiModelProperty("出生年月日")
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate birthday;
@ApiModelProperty("学历")
private String education;
@ApiModelProperty("从业时间")
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate practiceDate;
@ApiModelProperty("评级")
private String level;
@ApiModelProperty("认证标签")
private String authLabel;
@ApiModelProperty("勋章(逗号隔开)")
private String medal;
@ApiModelProperty("服务评分")
private Integer serviceScore;
@ApiModelProperty("已实名认证")
private Integer realNameAuth;
@ApiModelProperty("身份证号")
private String idCardNo;
@ApiModelProperty("户籍-省")
private String idCardProvince;
@ApiModelProperty("户籍-市")
private String idCardCity;
@ApiModelProperty("户籍-区")
private String idCardRegion;
@ApiModelProperty("手机号")
private String phone;
@ApiModelProperty("现住-省")
private String presentProvince;
@ApiModelProperty("现住-市")
private String presentCity;
@ApiModelProperty("现住-区")
private String presentRegion;
@ApiModelProperty("现住-地址")
private String presentAddress;
@ApiModelProperty("头像")
private String head;
@ApiModelProperty("使用状态(初始化:init;正常:normal,禁用:ban)")
private String useStatus;
@ApiModelProperty("审核状态 (待审核:await;通过:pass,驳回:reject)")
private String auditStatus;
@ApiModelProperty("驳回理由")
private String rejectReason;
@ApiModelProperty("注册时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime regDate;
@ApiModelProperty("介绍-内容")
private String introContent;
@ApiModelProperty("介绍-视频")
private String introVideo;
@ApiModelProperty("介绍-标签")
private String introLabel;
@ApiModelProperty("商家标识")
private Long merchantId;
@ApiModelProperty("组织标识")
private Long orgId;
@ApiModelProperty("护理员申请sku,多个逗号隔开")
private String skuId;
@ApiModelProperty("护理员排班数据,多个逗号隔开")
private String schedule;
/**
* 逻辑删除
*/
private Integer del;
/**
* 创建人
*/
private String createBy;
/**
* 创建时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/**
* 更新者
*/
private String updateBy;
/**
* 更新时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime;
}
- 分页返回值EsPageVO
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
/**
* @author gideon
* @date 2022/9/5
*/
@Data
public class EsPageVO<T> {
@ApiModelProperty("总页数")
private Integer pages;
@ApiModelProperty("总条目数")
private Long total;
@ApiModelProperty("结果集")
private List<T> list;
}
到了这里,关于spring boot +springboot集成es7.9.1+canal同步到es的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!