前言
在本文中,我们将深入探讨 ElasticSearch 在数据处理中的关键功能,包括数据聚合、查询自动补全以及与数据库的同步问题。
首先,我们将聚焦于 ElasticSearch 强大的聚合功能,解释什么是聚合以及如何通过 DSL 语句和 RestClient 实现各种聚合操作。这一功能能够让我们更深入地了解和分析存储在 ElasticSearch 中的数据。
随后,我们将介绍查询自动补全功能,重点探讨拼音分词器、自定义分词器,以及如何通过 DSL 实现搜索的自动补全功能。这将帮助我们构建更智能和用户友好的搜索体验。
最后,我们将关注 ElasticSearch 与数据库的同步问题。我们将探讨不同的数据同步方式,包括同步调用、异步通知和监听 Binlog。特别地,我还会演示如何使用 RabbitMQ 实现高效的数据同步,确保数据库与 ElasticSearch 索引库中的数据保持一致。
希望通过本文,能够帮助我们更加深入地了解 ElasticSearch 在数据处理中的核心特性,解决实际问题,提升系统的性能和可维护性。接下来,让我们直奔主题,开始探索 ElasticSearch 的精髓!
一、数据的聚合
1.1 什么是聚合
在 Elasticsearch 中,聚合是对数据进行汇总、分析和提取统计信息的过程。它可以帮助我们从海量的数据集中提取有用的见解,例如平均值、最小值、最大值、总和等。聚合不仅可以用于数值类型的字段,还可以用于文本字段、日期字段等。
1.2 聚合的分类
桶聚合(Bucket Aggregations):
-
Term Aggregation(词条桶聚合): 用于按照文档中的词条(terms)进行分组。例如,可以根据商品类型、关键字等将文档分组。
-
Date Histogram(日期直方图聚合): 将日期字段按照一定的时间间隔划分为桶,用于按时间范围分组。例如,可以按照每天、每周、每月等划分。
指标聚合(Metrics Aggregations):
-
Avg Aggregation(平均聚合): 计算数值字段的平均值。
-
Max Aggregation(最大值聚合): 获取数值字段的最大值。
-
Min Aggregation(最小值聚合): 获取数值字段的最小值。
-
Stats Aggregation(统计聚合): 统计数值字段的多个指标,包括最大值、最小值、平均值、总和等。
管道聚合(Pipeline Aggregations): 管道聚合是在其他聚合结果的基础上再进行聚合。
1.3 DSL 语句实现聚合的示例
1. DSL 实现
Bucket
聚合:
例如,现在我们有一个需求就是统计所有数据中的酒店品牌有哪几种,此时就可以根据酒店的品牌名称做聚合,DSL 语句如下:
GET /hotel/_search
{
"size": 0, // 设置要获取的文档数为 0,即结果不包含文档,只需要包含聚合的结果
"aggs": { // 定义聚合
"brandAgg": { // 自定义聚合的名称
"terms": { // 指定 Bucket 聚合的类型
"field": "brand", // 指定需要聚合的字段
"size": 10 // 展示聚合的最大数量
}
}
}
}
下面对这个 DSL 语句进行解释:
-
GET /hotel/_search: 表示向 Elasticsearch 发送一个搜索请求,搜索的索引是 “hotel”。
-
“size”: 0: 设置要获取的文档数为 0。这是因为我们在这个场景中只关心聚合结果,而不需要返回文档内容,因此将文档大小设为 0。
-
“aggs”: { … }: 定义聚合操作。在这个例子中,我们只有一个聚合,可以根据实际需求添加更多聚合。
-
“brandAgg”: { … }: 自定义聚合的名称为 “brandAgg”。可以根据实际情况给聚合起一个有意义的名字。
-
“terms”: { … }: 指定桶聚合的类型为 “terms”,即根据字段值进行分组。
-
“field”: “brand”: 指定需要聚合的字段为 “brand”,即酒店的品牌名称。
-
“size”: 10: 展示聚合的最大数量为 10。这表示返回最常出现的 10 个品牌名称,你可以根据需要调整这个值。
最终,Elasticsearch 将返回一个聚合结果,其中包含了所有酒店品牌及其出现的次数。这对于分析数据中存在的不同酒店品牌是非常有用的。
执行结果:
对上述的聚合结果进行排序:
例如,按每种品牌的文档的数量降序排序,此时需要使用到 _count
排序规则,DSL 语句如下:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "desc"
},
"size": 10
}
}
}
}
限定聚合的范围:
在实际情况下,面对海量的数据不可能每次聚合都查询整个索引库,而是需要指定一个范围,然后再进行聚合操作。例如,此处我们对价格在 200 以下的文档做聚合操作:
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "desc"
},
"size": 10
}
}
}
}
可以发现,限定聚合的范围,实际上就是在进行聚合操作之前,进行一次某种查询操作,然后再对查询的结果进行聚合。
2. DSL 实现
Metrics
聚合:
例如,现在我们需要在品牌聚合的基础之上,获取每个品牌用户平方的最大、最小和平均值,此时就会使用到 Metrics
聚合:
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "desc"
},
"size": 10
},
"aggs": {
"score_stats": {
"stats": {
"field": "score"
}
}
}
}
}
}
下面对这个 DSL 语句进行解释:
-
GET /hotel/_search: 向 Elasticsearch 发送搜索请求,搜索的索引是 “hotel”。
-
“size”: 0: 设置要获取的文档数为 0,因为我们只关心聚合结果而不需要文档内容。
-
“aggs”: { … }: 定义聚合操作,这里使用了桶聚合。
-
“brandAgg”: { … }: 自定义桶聚合的名称为 “brandAgg”。
-
“terms”: { … }: 指定桶聚合的类型为 “terms”,即根据字段值进行分组。在这里,我们按照品牌名称进行分组。
-
“field”: “brand”: 指定需要聚合的字段为 “brand”,即酒店的品牌名称。
-
“order”: { “_count”: “desc” }: 根据文档数量(即每个品牌的数量)降序排列。
-
“size”: 10: 展示聚合的最大数量为 10,即返回最常出现的 10 个品牌名称。
-
-
“aggs”: { … }: 在每个品牌的桶上再进行聚合。
-
“score_stats”: { … }: 自定义指标聚合的名称为 “score_stats”。
-
“stats”: { … }: 指定指标聚合的类型为 “stats”,即统计数值字段的多个指标。
- “field”: “score”: 指定需要计算统计信息的字段为 “score”,即用户评分字段。
-
-
-
最终,Elasticsearch 将返回一个聚合结果,其中包含了每个品牌的用户评分的最大、最小和平均值。这有助于分析品牌在用户评分方面的表现。
1.4 RestClient 实现聚合
我们以品牌聚合为例,演示下 Java的 RestClient 使用,例如请求组装的代码:
通过上图可以发现,RestClient 实现聚合使用的是 source
中的 aggregation
方法,然后在这个方法中使用 AggregationBuilders
指定聚合的类型、名称和聚合的字段等等。
再来看看对查询结果的解析:
要解析出聚合的结果,首先从 response
中拿到aggregations
对象,然后再根据聚合名称,获取到具体的 bucket
数组,这个数组就是具体的聚合结果了。
整个完整的代码实现如下:
@Test
void testAggregation() throws IOException {
// 1. 准备 Request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
// 2.1 去掉文档内容
request.source().size(0);
// 2.2 聚合
request.source().aggregation(AggregationBuilders
.terms("brandAgg")
.field("brand")
.size(10)
);
// 3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
// 4.1 获取所有聚合结果
Aggregations aggregations = response.getAggregations();
// 4.2 根据聚合名称获取聚合结果
Terms brandTerms = aggregations.get("brandAgg");
// 4.3 获取 bucket
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.4 遍历
for (Terms.Bucket bucket : buckets) {
String key = bucket.getKeyAsString();
System.out.println(key);
}
}
这段代码演示了如何使用 Java 的 RestClient 来实现 Elasticsearch 的桶聚合操作。以下是代码的一些重要部分的解释:
-
准备 Request:
- 创建了一个
SearchRequest
,指定要搜索的索引为 “hotel”。
- 创建了一个
-
准备 DSL:
- 通过
request.source()
获取搜索请求的 DSL 部分。 - 使用
size(0)
去掉文档内容,因为你只关心聚合结果而不需要文档内容。 - 使用
aggregation
添加一个桶聚合(terms aggregation)。-
AggregationBuilders.terms("brandAgg")
:创建一个词条桶聚合,命名为 “brandAgg”。 -
.field("brand")
:指定聚合的字段为 “brand”,即酒店的品牌名称。 -
.size(10)
:指定展示的聚合桶的最大数量为 10。
-
- 通过
-
发送请求:
- 使用
client.search(request, RequestOptions.DEFAULT)
发送搜索请求,获取搜索响应。
- 使用
-
解析结果:
- 使用
response.getAggregations()
获取所有的聚合结果。 - 使用
aggregations.get("brandAgg")
根据聚合名称获取品牌聚合的结果。 - 使用
brandTerms.getBuckets()
获取所有的桶。 - 遍历每个桶,使用
bucket.getKeyAsString()
获取桶的键(品牌名称),然后打印出来。
- 使用
整个代码片段展示了通过 RestClient 执行 Elasticsearch 的桶聚合操作,并解析聚合结果。这是一个典型的 Elasticsearch 查询聚合的示例,可以根据实际需求对 DSL 进行调整和扩展。
二、查询自动补全
当用户在搜索框输入拼音字母的时候,一般都会提示出与这些拼音有关的搜索项,例如:
那么 ElasticSearch 如何实现这个功能呢?那就是拼音分词器,下文将详细介绍如何安装、配置以及使用拼音分词器。
2.1 引入拼音分词器
实现根据字母做补全,就必须对文档按照拼音分词。在 GitHub上恰好有 ElasticSearch 的拼音分词插件。
下载地址:https://github.com/medcl/elasticsearch-analysis-pinyin。
安装方式与 IK 分词器一样,分三步:
- 下载并解压;
- 上传到虚拟机中 ElasticSearch 数据卷的
plugin
目录; - 重启 ElasticSearch 服务;
- 测试拼音分词效果。
下面是安装的示例:
1. 找到 es-plugins
数据卷的挂载目录:
2. 进入这个目录,将拼音分词器解压并上传至这个目录,我将其解压重命名为了 py
:
3. 重启 ElasticSearch 服务:
docker restart es
- 测试拼音的分词效果:
POST /_analyze
{
"text": ["拼音分词器安装成功了吗?"],
"analyzer": "pinyin"
}
执行结果如下:
我们可以发现,对上述的文字实现了拼音的分词,但是存在一个问题,那就是现在是除了每个字的首字母拼在一起外,其他都是一个字一个字分的,这些似乎并没有意义,因此我们需要对分词器进行自定义操作。
2.2 自定义分词器
在ElasticSearch中,分词器(analyzer)由三部分组成:
- Character filters(字符过滤器): 在tokenizer之前对文本进行处理,例如删除字符或替换字符。
-
Tokenizer(分词器): 将文本按照一定的规则切割成词条(term),例如使用
keyword
表示不分词,或者使用ik_smart
进行智能中文分词。 - Tokenizer filters(分词器过滤器): 对tokenizer输出的词条进行进一步处理,例如进行大小写转换、同义词处理、拼音处理等。
在创建索引库时,可以通过settings
来配置自定义的analyzer
。以下是一个简单的自定义分词器的示例:
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "pinyin"
}
}
}
}
}
上述自定义分词器的功能是先使用ik_max_word
进行分词,然后再使用pinyin
分词器生成拼音分词。然而,这可能导致每个字都被分开,需要进一步解决这个问题。
针对这个问题,拼音分词器的官方文档在 “Optional Parameters” 部分提供了多种选项来自定义拼音分词器的行为:
根据官方文档,我们可以修改自定义分词器的DSL语句,以解决每个字分开的问题:
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer"
}
}
}
}
以下是对上述 DSL 代码的说明:
-
"my_analyzer"
是我们自定义的分词器的名称。 -
"tokenizer": "ik_max_word"
指定了使用ik_max_word
分词器来进行中文分词。 -
"filter": "py"
使用了名为"py"
的过滤器。
在过滤器部分:
-
"type": "pinyin"
表明我们使用了拼音分词器。 -
"keep_full_pinyin": false
表示不保留完整的拼音,而是将拼音拆分成单个字的拼音。 -
"keep_joined_full_pinyin": true
表示保留拼音合并的完整拼音。 -
"keep_original": true
表示保留原始文本。 -
"limit_first_letter_length": 16
可以用于限制首字母的长度。 -
"remove_duplicated_term": true
表示移除重复的词条。 -
"none_chinese_pinyin_tokenize": false
表示不对非汉字文本进行拼音分词。
最后,在映射部分,将 "name"
字段的分析器指定为 "my_analyzer"
,以便使用我们自定义的分词器来分析该字段的内容。这个设置将影响文档索引和搜索时的分词行为,确保了拼音分词器的有效使用。
然后我们可以测试以下这个自定义的分词器:
POST /test/_analyze
{
"text": ["拼音分词器安装成功了吗?"],
"analyzer": "my_analyzer"
}
运行结果:
发现此时便能够为分词形成的词语生成首字母缩写以及全拼的拼音分词了。
但是,现在这个分词器还是存在一些问题,如下面的例子:
POST /test/_doc/1
{
"id": 1,
"name": "狮子"
}
POST /test/_doc/2
{
"id": 2,
"name": "虱子"
}
现在想 test
索引库中添加两个文档,一个的name
字段是“狮子”,而另一个是 “虱子”,它们的拼音相同,因此使用拼音查询的时候可以查到这两个文档:
GET /test/_search
{
"query": {
"match": {
"name": "shizi"
}
}
}
但是,如果是使用 “狮子” 进行搜索呢?
GET /test/_search
{
"query": {
"match": {
"name": "掉入狮子笼咋办"
}
}
}
在如此着急的情况下,却搜索出了“虱子”,那么现在就惨了。存在上述情况的原因是拼音也参与了搜索,即 “shizi” 参与搜索,就会搜索出同音的词语了。因此解决这个问题的方法就是不让拼音也参与搜索:
例如,字段在创建倒排索引时应该用 my_analyzer
分词器;字段在搜索时应该使用ik_smart
分词器;
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word", "filter": "py"
}
},
"filter": {
"py": { ... }
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
此时,使用这个分词器就不会出现上面的问题了。
2.3 DSL 自动补全查询
ElasticSearch 提供了 Completion Suggester
查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:
- 参与补全查询的字段必须是
completion
类型。
// 创建索引库 PUT test
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
- 字段的内容一般是用来补全的多个词条形成的数组。
// 示例数据
POST test/_doc
{
"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
"title": ["SK-II", "PITERA"]
}
POST test/_doc
{
"title": ["Nintendo", "switch"]
}
查询的 DSL 语法如下:
POST /test/_search
{
"suggest": {
"title_suggest": {
"text": "s", // 关键字
"completion": {
"field": "title", // 补全字段
"skip_duplicates": true, // 跳过重复的
"size": 10 // 获取前10条结果
}
}
}
}
查询结果:
成功查询出了 s
开头的文档。
2.4 实现搜索的自动补全功能
目标:实现hotel
索引库的自动补全、拼音搜索功能。
步骤一:修改索引库:
- 修改
hotel
索引库结构,设置自定义拼音分词器;
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
}
- 修改
name
、all
字段,也使用自定义分词器:
"name":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
}
- 索引库添加一个新字段
suggestion
,类型为completion
类型,使用自定义的分词器
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
- 给
HotelDoc
类添加suggestion
字段,内容包含brand
、business
- 重新导入数据到
hotel
索引库
注意:name
、all
是可分词的,自动补全的brand
、business
是不可分词的,要使用不同的分词器组合
步骤二:了解自动补全功能的 RestAPI
1. 请求参数构造的API:
// 1.准备请求
SearchRequest request = new SearchRequest("hotel");
// 2.请求参数
request.source()
.suggest(new SuggestBuilder().addSuggestion(
"mySuggestion",
SuggestBuilders
.completionSuggestion("title")
.prefix("h")
.skipDuplicates(true)
.size(10)
));
// 3.发送请求
client.search(request, RequestOptions.DEFAULT);
自动补全的RestAPI 使用到了 source
中的 suggest
方法,这个方法的参数需要使用 SuggestBuilder
进行构造,下面是和 DSL 语句的对比,通过这个对比可对代码的结构一目了然:
2. 结果解析的代码
要想解析出所有的补全关键词,通过可以通过对照 DSL 响应,来编写代码:
下面是实现搜索自动补全的完整测试代码:
@Test
void testSuggest() throws IOException {
// 1. 准备 Request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备 DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("hh")
.skipDuplicates(true)
.size(10)
));
// 3. 发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
Suggest suggest = response.getSuggest();
// 4.1 根据名称获取解析结果
CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
// 4.2 获取 Options 并遍历
List<String> result = new ArrayList<>();
for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {
// 获取 option 中的 text
String text = option.getText().toString();
result.add(text);
}
System.out.println(result);
}
上述 Java 代码是一个使用 Elasticsearch Java 客户端进行搜索建议(Suggest)的测试方法。下面是对代码的简单说明:
-
准备 Request:
- 创建一个
SearchRequest
实例,用于定义搜索请求的主体。
- 创建一个
-
准备 DSL:
- 使用
source()
方法获取SearchSourceBuilder
实例,然后使用suggest
方法添加搜索建议。 - 在这里,添加了一个名为 “suggestions” 的建议,使用
SuggestBuilders.completionSuggestion
构建建议。 - 设置建议的字段为 “suggestion”,指定前缀为 “hh”,跳过重复项,设置建议的大小为 10。
- 使用
-
发送请求:
- 使用 Elasticsearch 客户端的
search
方法发送搜索请求,并将结果存储在SearchResponse
实例中。
- 使用 Elasticsearch 客户端的
-
解析结果:
- 通过
response.getSuggest()
获取建议结果。 - 使用建议的名称 “suggestions” 获取
CompletionSuggestion
实例。 - 遍历建议的选项(
Option
),并将文本添加到结果列表中。
- 通过
-
打印结果:
- 最后,将结果列表打印到控制台。
此代码主要用于测试 Elasticsearch 中搜索建议的功能。搜索建议是 Elasticsearch 提供的一种功能,可以根据用户的输入提供自动完成的建议。
三、ES 与数据库的同步问题
ElasticSearch 中的酒店数据来自于 MySQL 数据库,因此当 MySQL 数据发生改变时,ElasticSearch 中的数据也必须跟着改变,这个就是ElasticSearch 与 MySQL 之间的数据同步。
3.1 数据同步方式的探索
在微服务架构中,不同微服务可能需要共享数据,例如,一个负责酒店管理(操作 MySQL 数据库)的微服务需要与负责酒店搜索(操作 ElasticSearch)的微服务进行数据同步。在这篇博客中,我们将探讨三种不同的数据同步方式,并分析它们的优缺点。
3.1.1 方案一:同步调用
说明:
当酒店管理服务新增酒店时,首先向 MySQL 数据库中写入数据,然后再调用酒店搜索服务的接口,更新 ElasticSearch 索引库中的数据。这种方式是微服务之间的同步调用,实现简单但耦合度较高。
优缺点:
- 优点:实现简单,操作直观。
- 缺点:业务之间的耦合度高,一个服务的变更可能导致另一个服务的调整。
3.1.2 方案二:异步通知
说明:
使用消息队列(MQ)实现异步通知。当酒店管理服务新增酒店时,先向 MySQL 数据库中写入数据,然后让 MQ 发送新增消息;酒店搜索服务监听 MQ 中的消息,当有消息时,更新 ElasticSearch 索引库中的内容。实现了微服务间的解耦,但依赖于 MQ 的可靠性。
优缺点:
- 优点:低耦合,实现难度适中。
- 缺点:依赖 MQ 的可靠性,可能会因为消息丢失或重复而引发问题。
3.1.3 方案三:监听 Binlog
说明:
MySQL 在进行数据库操作时,会将操作记录到 Binlog 中。通过使用中间件 Canal 监听 Binlog,当酒店管理服务对 MySQL 数据库进行操作时,MySQL 会写入一条 Binlog 日志,Canal 监听到后通知酒店搜索服务,从而实现 ElasticSearch 索引库的更新。这种方式完全解除了服务间的耦合。
优缺点:
- 优点:完全解耦,微服务之间不直接通信。
- 缺点:开启 Binlog 会增加 MySQL 数据库的负担,实现复杂度较高。
3.1.4 三种方式的优缺点分析与总结
-
同步调用
- 优点:实现简单,操作直观。
- 缺点:业务之间的耦合度高,一个服务的变更可能导致另一个服务的调整。
-
异步通知
- 优点:低耦合,实现难度适中。
- 缺点:依赖 MQ 的可靠性,可能会因为消息丢失或重复而引发问题。
-
监听 Binlog
- 优点:完全解耦,微服务之间不直接通信。
- 缺点:开启 Binlog 会增加 MySQL 数据库的负担,实现复杂度较高。
根据具体业务需求和系统架构,选择合适的数据同步方式至关重要。同步调用适用于简单业务场景,异步通知适用于需要解耦但对消息可靠性要求不高的场景,监听 Binlog 适用于追求高度解耦和数据一致性的场景。
下面将使用 RabbitMQ 实现 ElasticSearch 和 MySQL 之间的数据同步。
3.2 RabbitMQ 实现数据间的同步
在微服务架构中,为了实现不同微服务之间的数据同步,使用消息队列(MQ)是一种常见的解决方案。下面我将演示如何使用 RabbitMQ 实现酒店管理服务与酒店搜索服务之间的数据同步。
3.2.1 声明 Exchange、Queue、RoutingKey
首先,我们创建一个 MQConstants
类,定义 Exchange、Queue、RoutingKey 等常量的名称。
public class MQConstants {
// 交换机
public final static String HOTEL_EXCHANGE = "hotel.topic";
// 监听新增和修改的队列
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
// 监听删除的队列
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
// 新增或修改的 RoutingKey
public final static String HOTEL_INSERT_KEY = "hotel.insert.key";
// 删除的 RoutingKey
public final static String HOTEL_DELETE_KEY = "hotel.delete.key";
}
这里我们定义了一个 Topic 类型的 Exchange,以及两个 Queue 分别用于监听新增和修改的操作以及删除的操作。同时,定义了对应的 RoutingKey。
然后,通过 @Bean
注解的方式,在 MQConfig
类中声明 Exchange、Queue 以及它们之间的绑定关系。
@Configuration
public class MQConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MQConstants.HOTEL_EXCHANGE);
}
@Bean
public Queue insertQueue(){
return new Queue(MQConstants.HOTEL_INSERT_QUEUE);
}
@Bean
public Queue deleteQueue(){
return new Queue(MQConstants.HOTEL_DELETE_QUEUE);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder
.bind(insertQueue())
.to(topicExchange())
.with(MQConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder
.bind(deleteQueue())
.to(topicExchange())
.with(MQConstants.HOTEL_DELETE_KEY);
}
}
在这里,我们声明了一个 Topic 类型的 Exchange,以及两个 Queue,并通过 Binding 将 Exchange 和 Queue 进行了绑定。
3.2.2 在数据库的增删改操作中实现消息的监听
在酒店管理服务中,对数据库的增删改操作中添加消息发送的逻辑。
新增操作:
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
// 新增成功后,向 MQ 中发送消息
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE, MQConstants.HOTEL_INSERT_KEY, hotel.getId());
}
修改操作:
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
// 更新成功后,向 MQ 中发送消息
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE, MQConstants.HOTEL_INSERT_KEY, hotel.getId());
}
删除操作:
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
// 删除成功后,向 MQ 中发送消息
rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE, MQConstants.HOTEL_DELETE_KEY, id);
}
在这里,我们使用了 RabbitMQ 提供的 RabbitTemplate
来发送消息到 Exchange,实现了对应操作后的消息发送。
3.2.3 在搜索服务中实现消息的监听和处理功能
创建 HotelListener
类,用于监听 MQ 中的消息。
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
@RabbitListener(queues = MQConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
@RabbitListener(queues = MQConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
在 HotelListener
类中,定义了两个监听方法,分别处理新增或修改操作的消息和删除操作的消息。这两个方法分别调用了对应的服务方法,实现了数据在酒店管理服务和酒店搜索服务之间的同步。文章来源:https://www.toymoban.com/news/detail-714224.html
这样,通过使用 RabbitMQ 实现了微服务之间的数据同步,提高了系统的可扩展性和解耦程度。选择适当的消息同步方式可以根据具体业务需求和系统架构进行调整。文章来源地址https://www.toymoban.com/news/detail-714224.html
到了这里,关于【ElasticSearch】深入探索 ElasticSearch 对数据的聚合、查询自动补全、与数据库间的同步问题以及使用 RabbitMQ 实现与数据库间的同步的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!