谷粒商城(三)

这篇具有很好参考价值的文章主要介绍了谷粒商城(三)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Elasticsearch - 全文检索

简介

全文搜索属于最常见的需求,开源的 Elasticsearch 是目前全文搜索引擎的首选,它可以快速地储存、搜索和分析海量数据。维基百科、Stack Overflow、Github 都采用它。
Elastic 的底层是开源库 Lucene。但是,你没法直接用 Lucene,必须自己写代码去调用它的接口。Elastic 是 Lucene 的封装,提供了 REST API 的操作接口,开箱即用。

REST API:天然的跨平台。
官方文档(推荐)
官方中文
社区中文1
社区中文2

1、基本概念

谷粒商城(三)

1、Index(索引)
动词,相当于 MySQL 中的 insert;
名词,相当于 MySQL 中的 Database

2、Type(类型)
在 Index(索引)中,可以定义一个或多个类型。
类似于 MySQL 中的 Table;每一种类型的数据放在一起;
es6 之后要求一个索引只能包含一个类型

3、Document(文档)
保存在某个索引(Index)下,某种类型(Type)的一个数据(Document),文档是 JSON 格式的,Document 就像是 MySQL 中的某个 Table 里面的内容;

4、搜索原理:倒排索引机制
谷粒商城(三)

2、Docker 安装 Es

1、下载镜像文件

docker pull elasticsearch:7.4.2 存储和检索数据
docker pull kibana:7.4.2 可视化检索数据

2、创建实例

ElasticSearch
配置

mkdir -p /mydata/elasticsearch/config # 用来存放配置文件
mkdir -p /mydata/elasticsearch/data  # 数据
echo "http.host: 0.0.0.0" >/mydata/elasticsearch/config/elasticsearch.yml # 允许任何机器访问
chmod -R 777 /mydata/elasticsearch/ ## 设置elasticsearch文件可读写权限

启动

docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e  "discovery.type=single-node" \      单例启动
-e ES_JAVA_OPTS="-Xms64m -Xmx512m" \	分配内存
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v  /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.4.2 

开机启动 elasticsearch

docker update elasticsearch --restart=always
以后再外面装好插件重启就可

特别注意:
-e ES_JAVA_OPTS=“-Xms64m -Xmx128m” \ 测试环境下,设置 ES 的初始内存和最大内存,否则导致过大启动不了ES

Kibana

docker run --name kibana -e ELASTICSEARCH_HOSTS=http://192.168.56.10:9200 -p 5601:5601 -d kibana:7.4.2

安装完 es 、Kibana后
访问 http://192.168.56.10:9200 检测是否成功运行 es
访问 http://192.168.56.10:5601 可视化检索工具 Kibana

安装nginx
如果 docker 没有安装过,也可以直接启动一个 nginx 实例,启动时会自动帮你下载

docker run -p80:80 --name nginx -d nginx:1.10   

将容器内的配置文件拷贝到当前目录 (注意后面有个小点)

以下都是在mydata目录下执行

docker container cp nginx:/etc/nginx .   #将容器内的配置文件拷贝到当前目录:注意容器名要一致 nginx: 这里是容器名
ls   ##会发现mydata目录下出现 nginx 目录
mv nginx conf    #修改文件名称nginx为conf
mkdir nginx   #再重新创建文件夹
mv conf nginx/      #将conf移入nginx

执行以下命令运行

 docker run -p 80:80 --name nginx \
 -v /mydata/nginx/html:/usr/share/nginx/html \
 -v /mydata/nginx/logs:/var/log/nginx \
 -v /mydata/nginx/conf/:/etc/nginx \
 -d nginx:1.10

给 nginx 的 html 文件夹下面放的所有资源可以直接访问,一般 index.html 放在此文件夹下,则访问 IP:80 的nginx地址时就会访问此html文件,要访问 html 目录下的文件也只需要修改路径直接访问即可

例如后面自定义分词器,创建 html/es/fenci.txt 文件,文件内放入自定义分词 乔碧萝 等,访问 IP:80/es/fenci.txt 可以在浏览器看到

3、初步检索

3.1 _cat

GET /_cat/nodes:查看所有节点
GET /_cat/health:查看 es 健康状况
GET /_cat/master:查看主节点
GET /_cat/indices:查看所有索引 show databases;

3.2 索引一个文档(保存)

保存一个数据,保存在哪个索引的哪个类型下,指定用哪个唯一标识

#在 customer 索引下的 external 类型下保存 1 号数据
PUT customer/external/1
{ "name": "John Doe"}

PUT 和 POST 都可以:

  • POST 新增。如果不指定 id,会自动生成 id。指定 id 就会修改这个数据,并新增版本号
  • PUT 可以新增可以修改。PUT 必须指定 id;由于 PUT 需要指定 id,我们一般都用来做修改操作,不指定 id 会报错。

3.3 查询文档

GET customer/external/1

结果:

{
	"_index": "customer", //在哪个索引
	"_type": "external", //在哪个类型
	"_id": "1", //记录 id
	"_version": 2, //版本号
	"_seq_no": 1, //并发控制字段,每次更新就会+1,用来做乐观锁
	"_primary_term": 1, //同上,主分片重新分配,如重启,就会变化
	"found": true, "_source": { //真正的内容
		"name": "John Doe"
	}
}

因为有两个并发控制字段,所以更新的时候可以在语句后携带 ?if_seq_no=0&if_primary_term=1

3.4 更新文档

//POST带_update
POST customer/external/1/_update
{
    "doc":{ "name": "John Doew"}
}

//或者 POST不带_update
POST customer/external/1
{ "name": "John Doe2"}

//或者 PUT
PUT customer/external/1
{ "name": "John Doe"}

不同:

  • POST带_update,对比原来数据,与原来一样就什么都不做,version/seq_no 都不变
  • POST不带_update、PUT ,如果数据不变,还是会更新版本

看场景;

  • 对于大并发更新,不带 update;
  • 对于大并发查询偶尔更新,带 update;对比更新,重新计算分配规则。

更新同时增加属性

POST customer/external/1/_update
{ 
  "doc": { 
    "name": "Jane Doe", 
    "age": 20 
  }
}
PUTPOST 不带_update 也可以

3.5 删除文档&索引

DELETE customer/external/1
DELETE customer

3.6 bulk 批量 API

POST customer/external/_bulk
{"index":{"_id":"1"}}
{"name": "John Doe" }
{"index":{"_id":"2"}}
{"name": "Jane Doe" }

语法格式:

{ action: { metadata }}\n
{ request body }\n
{ action: { metadata }}\n
{ request body }\n

复杂实例:

POST /_bulk
{ "delete": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "create": { "_index": "website", "_type": "blog", "_id": "123" }}
{ "title": "My first blog post" }
{ "index": { "_index": "website", "_type": "blog" }}
{ "title": "My second blog post" }
{ "update": { "_index": "website", "_type": "blog", "_id": "123", "_retry_on_conflict" : 3} }
{ "doc" : {"title" : "My updated blog post"} }

bulk API 以此按顺序执行所有的 action(动作)。如果一个单个的动作因任何原因而失败,它将继续处理它后面剩余的动作。当 bulk API 返回时,它将提供每个动作的状态(与发送的顺序相同),所以您可以检查是否一个指定的动作是不是失败了。

3.7 样本测试数据

准备了一份顾客银行账户信息的虚构的 JSON 文档样本。每个文档都有下列的 schema(模式):

{ 
  "account_number": 0, 
  "balance": 16623, 
  "firstname": "Bradshaw", 
  "lastname": "Mckenzie", 
  "age": 29, 
  "gender": "F", 
  "address": "244 Columbus Place", 
  "employer": "Euron", 
  "email": "bradshawmckenzie@euron.com", 
  "city": "Hobucken", 
  "state": "CO"
}

ES测试数据 Gitee
ES测试数据语雀

将测试数据 JSON 拷贝进 ES 开发窗口,使用以下语句:

POST bank/account/_bulk
测试数据

谷粒商城(三)

4、进阶检索

4.1 SearchAPI

ES 支持两种基本方式检索 :

一个是通过使用 REST request URI 发送搜索参数(uri+检索参数)
另一个是通过使用 REST request body 来发送它们(uri+请求体)

1 检索信息

一切检索从_search 开始

GET bank/_search  //检索 bank 下所有信息,包括 type 和 docs
GET bank/_search?q=*&sort=account_number:asc   //请求参数方式检索

响应结果解释:
took - Elasticsearch 执行搜索的时间(毫秒)
time_out - 告诉我们搜索是否超时
_shards - 告诉我们多少个分片被搜索了,以及统计了成功/失败的搜索分片
hits - 搜索结果
hits.total - 搜索结果
hits.hits - 实际的搜索结果数组(默认为前 10 的文档)
sort - 结果的排序 key(键)(没有则按 score 排序)
score 和 max_score –相关性得分和最高得分(全文检索用)

uri+请求体进行检索

GET bank/_search
{ 
  "query": 
  { 
    "match_all": {}
  },
  "sort": [
    { 
      "account_number": { 
        "order": "desc"
      }
    }
  ]
}

我们 POST 一个 JSON 风格的查询请求体到 _search API。
需要了解,一旦搜索的结果被返回,Elasticsearch 就完成了这次请求,并且不会维护任何服务端的资源或者结果的 cursor(游标)

4.2 Query DSL

1 基本语法格式

Elasticsearch 提供了一个可以执行查询的 Json 风格的 DSL(domain-specific language 领域特定语言)。这个被称为 Query DSL。该查询语言非常全面,并且刚开始的时候感觉有点复杂,真正学好它的方法是从一些基础的示例开始的。

一个查询语句 的典型结构
{
QUERY_NAME: {
ARGUMENT: VALUE, ARGUMENT: VALUE,... }
}

如果是针对某个字段,那么它的结构如下:
{
  QUERY_NAME: {
    FIELD_NAME: {
      ARGUMENT: VALUE, 
      ARGUMENT: VALUE,... 
    }
  }
}

示例:

GET bank/_search
{ 
  "query": {   //query 定义如何查询
    "match_all": {}   //match_all 查询类型【代表查询所有的所有】
  },
  "from": 0, 
  "size": 5,   // from+size 完成分页功能
  "sort": [   // sort 排序,多字段排序
    { 
      "account_number": { 
        "order": "desc"
      }
    }
  ]
}

match_all 查询类型【代表查询所有的所有】,es 中可以在 query 中组合非常多的查询类型完成复杂查询
除了 query 参数之外,我们也可以传递其它的参数以改变查询结果。如 sort,size
from+size 限定,完成分页功能
sort 排序,多字段排序,会在前序字段相等时后续字段内部排序,否则以前序为准

2 返回部分字段
GET bank/_search
{ 
  "query": {
    "match_all": {}
  },
  "from": 0, 
  "size": 5, 
  "_source": ["age","balance"]    //返回部分字段
}
3 match【匹配查询】

基本类型(非字符串),精确匹配

GET bank/_search
{ 
  "query": { 
    "match": {       //match 返回 account_number=20 的
      "account_number": "20"
    }
  }
}

字符串,全文检索

//查询出address中包含 mill 单词的所有记录
GET bank/_search
{ 
  "query": { 
    "match": {
      "address": "mill"
    }
  }
}

match 当搜索字符串类型的时候,会进行全文检索,并且每条记录有相关性得分。

字符串,多个单词(分词+全文检索)

//查询出address中包含 mill 或者 road 或者 mill road 的所有记录
GET bank/_search
{ 
  "query": { 
    "match": { 
      "address": "mill road"
    }
  }
}

最终查询出 address 中包含 mill 或者 road 或者 mill road 的所有记录,并给出相关性得分

4 match_phrase【短语匹配】

将需要匹配的值当成一个整体单词(不分词)进行检索

//查出 address 中包含 mill road 的所有记录
GET bank/_search
{ 
  "query": { 
    "match_phrase": { 
      "address": "mill road"
    }
  }
}

查出 address 中包含 mill road 的所有记录,并给出相关性得分

5 multi_match【多字段匹配】
//查询 state 或者 address 包含 mill
GET bank/_search
{ 
  "query": { 
    "multi_match": { 
      "query": "mill", 
      "fields": ["state","address"]
    }
  }
}

只要指定字段里面包含要查询的值就返回
并且会分词 多个值只要包含其中一个值就会返回

//"990 Mill Road"作为一个整体查询,是短语匹配,只要包含以下内容就可以查询
GET bank/_search
{
  "query": {
    "match_phrase": { 
      "address": "990 Mill Road"
    }
  }
}

//"990 Mill Road"作为一个整体查询,是绝对匹配,必须一模一样
//每一个字符串的字段都可以使用 .keyword
GET bank/_search
{
  "query": {
    "match": { 
      "address.keyword": "990 Mill Road"
    }
  }
}
6 bool【复合查询】

bool 用来做复合查询:
复合语句可以合并 任何 其它查询语句,包括复合语句,了解这一点是很重要的。这就意味着,复合语句之间可以互相嵌套,可以表达非常复杂的逻辑。

must: 必须达到 must 列举的所有条件

GET bank/_search
{ 
  "query": { 
    "bool": { 
      "must": [
        { 
          "match": { "address": "mill" } 
        },
        { 
          "match": { "gender": "M" } 
        }
      ]
    }
  }
}

should: 应该达到 should 列举的条件,如果达到会增加相关文档的评分,并不会改变查询的结果。
如果 query 中只有 should 且只有一种匹配规则,那么 should 的条件就会被作为默认匹配条件而去改变查询结果

GET bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": { "address": "mill" }
        }
      ],
      "should": [
        {
          "match": {"address": "lane"}
        }
      ]
    }
  }
}

must_not: 必须不是指定的情况

GET bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {"address": "mill"}
        },
        {
          "match": {"gender": "M"}
        }
      ],
      "must_not": [
        {
          "match": {"email": "baluba.com"}
        }
      ]
    }
  }
}

address 包含 mill,并且 gender 是 M,,但是 email 必须不包含 baluba.com

7 filter【结果过滤】

并不是所有的查询都需要产生分数,特别是那些仅用于 “filtering”(过滤)的文档。为了不计算分数 Elasticsearch 会自动检查场景并且优化查询的执行。

GET bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": { "address": "mill"}
        }
      ],
      "filter": {
        "range": {
          "balance": {
            "gte": 10000,
            "lte": 20000
          }
        }
      }
    }
  }
}
8 term

和 match 一样。匹配某个属性的值。全文检索字段用 match,其他非 text 字段匹配用 term。

GET bank/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "age": { "value": "28" }
          }
        },
        {
          "match": { "address": "990 Mill Road"}
        }
      ]
    }
  }
}
9 aggregations(执行聚合)

聚合提供了从数据中分组和提取数据的能力。最简单的聚合方法大致等于 SQL的 GROUP BY 和 SQL 聚合函数

在 Elasticsearch 中,您有执行搜索返回 hits(命中结果),并且同时返回聚合结果,把一个响应中的所有 hits(命中结果)分隔开的能力

这是非常强大且有效的,您可以执行查询和多个聚合,并且在一次使用中得到各自的(任何一个的)返回结果,使用一次简洁和简化的 API 来避免网络往返。

//搜索address中包含 mill 的所有人的年龄分布以及平均年龄,但不显示这些人的详情
GET bank/_search
{
  "query": {
    "match": {"address": "mill"}
  },
  "aggs": {     //执行聚合
    "group_by_state": {     //这次聚合的名字,方便展示在结果集中
      "terms": {      //聚合的类型
        "field": "age"
      }
    },
    "avg_age": {
      "avg": {		 //聚合的类型
        "field": "age"
      }
    }
  },
  "size": 0   //不显示搜索数据
}
//按照年龄聚合,并且请求这些年龄段的这些人的平均薪资
GET bank/account/_search
{
  "query": { "match_all": {} },
  "aggs": {
    "age_avg": {
        "terms": {
          "field": "age",
          "size": 1000     //给出1000种不同结果
        },
        "aggs": {
          "banlances_avg": {
            "avg": {"field": "balance"}
          }
        }
    }
  },
  "size": 1000
}
//查出所有年龄分布
//并且这些年龄段中 M 的平均薪资和 F 的平均薪资
//以及这个年龄段的总体平均薪资
GET bank/account/_search
{
  "query": {"match_all": {} },
  "aggs": {
    "age_agg": {
	      "terms": { 
	        "field": "age",		//按年龄分组
	        "size": 100     //给出100种不同结果
	      },
	      "aggs": {
	        "gender_agg": {
		          "terms": {
		            "field": "gender.keyword",   //在按年龄分组情况下再按性别分组
		            "size": 100
		          },
		          "aggs": {
		            "balance_avg": {
		              "avg": {
		                "field": "balance"		//求平均值
		              }
		            }
		          }
	        },
	        "balance_avg": {
	              "avg": {"field": "balance"} 	//在按年龄分组情况下求平均值
	        }
	      }
    }
  },
  "size": 1000
}

4.3 Mapping

1 字段类型

谷粒商城(三)
谷粒商城(三)
谷粒商城(三)

2 Mapping(映射)

Mapping 是用来定义一个文档(document),以及它所包含的属性(field)是如何存储和索引的。比如,使用 mapping 来定义:

  • 哪些字符串属性应该被看做全文本属性(full text fields)。
  • 哪些属性包含数字,日期或者地理位置。
  • 文档中的所有属性是否都能被索引(_all 配置)。
  • 日期的格式。
  • 自定义映射规则来执行动态添加属性。
//查看 mapping 信息:
GET bank/_mapping

修改 mapping 信息

自动猜测的映射类型
谷粒商城(三)

3 新版本改变

Es7 及以上移除了 type 的概念。

关系型数据库中两个数据表示是独立的,即使他们里面有相同名称的列也不影响使用,但 ES 中不是这样的。elasticsearch 是基于 Lucene 开发的搜索引擎,而 ES 中不同 type 下名称相同的 filed 最终在 Lucene 中的处理方式是一样的。

两个不同 type 下的两个 user_name,在 ES 同一个索引下其实被认为是同一个 filed,你必须在两个不同的 type 中定义相同的 filed 映射。否则,不同 type 中的相同字段名称就会在处理中出现冲突的情况,导致 Lucene 处理效率下降。

去掉 type 就是为了提高 ES 处理数据的效率。

Elasticsearch 7.x
URL 中的 type 参数为可选。比如,索引一个文档不再要求提供文档类型。

Elasticsearch 8.x
不再支持 URL 中的 type 参数。

解决:
1)、将索引从多类型迁移到单类型,每种类型文档一个独立索引
2)、将已存在的索引下的类型数据,全部迁移到指定位置即可。详见数据迁移

创建映射

//创建索引并指定映射
PUT /my-index
{
  "mappings": {
    "properties": {
      "age": {
        "type": "integer"
      },
      "email": {
        "type": "keyword"
      },
      "name": {
        "type": "text"
      }
    }
  }
}
//添加新的字段映射
PUT /my-index/_mapping
{
  "properties": {
    "employee-id": {
      "type": "keyword",
      "index": false
    }
  }
}

更新映射:
对于已经存在的映射字段,我们不能更新。更新必须创建新的索引进行数据迁移

数据迁移

先创建出 new_twitter 的正确映射。然后使用如下方式进行数据迁移

POST _reindex     //固定写法
{
  "source": {
    "index": "twitter"   //旧映射
  },
  "dest": {
    "index": "new_twitter"   //新的映射
  }
}

示例:

//将旧索引的 type 下的数据进行迁移
POST _reindex
{
  "source": {
    "index": "twitter",
    "type": "tweet"
  },
  "dest": {
    "index": "tweets"
  }
}

4.4 分词

一个 tokenizer(分词器)接收一个字符流,将之分割为独立的 tokens(词元,通常是独立的单词),然后输出 tokens 流

例如,whitespace tokenizer 遇到空白字符时分割文本。它会将文本"Quick brown fox!" 分割为 [Quick, brown, fox!]

tokenizer(分词器)还负责记录各个 term(词条)的顺序或 position 位置(用于 phrase 短语和 word proximity 词近邻查询),以及 term(词条)所代表的原始 word(单词)的 start(起始)和 end(结束)的 character offsets(字符偏移量)(用于高亮显示搜索的内容)

Elasticsearch 提供了很多内置的分词器,可以用来构建 custom analyzers(自定义分词器)

1 安装 ik 分词器

安装 ik 分词器:注意版本要与 es 版本一致

注意:不能用默认 elasticsearch-plugin install xxx.zip 进行自动安装
对应 es 版本安装地址

//进入 es 容器内部 plugins 目录
docker exec -it 容器id /bin/bash

cd plugins/
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip

//unzip 下载的文件
rm –rf *.zip
mv elasticsearch/ ik

//可以确认是否安装好了分词器
cd ../bin
elasticsearch plugin list  //即可列出系统的分词器

或者可以使用 secureFXP 将下载的 zip 文件(直接放解压后的也可以)放到本地的es的plugins 文件夹内

如果不知道运行的容器本地文件的位置,可执行以下命令

docker inspect <容器id>

谷粒商城(三)

2 测试分词器

使用默认

POST _analyze
{
  "text": "我是中国人"
}

使用分词器

POST _analyze
{
  "analyzer": "ik_smart",
  "text": "我是中国人"
}

另外一个分词器 ik_max_word

POST _analyze
{
  "analyzer": "ik_max_word",
  "text": "我是中国人"
}

能够看出不同的分词器,分词有明显的区别,所以以后定义一个索引不能再使用默认的 mapping 了,要手工建立 mapping, 因为要选择分词器。

3 自定义词库

修改/usr/share/elasticsearch/plugins/ik/config/中的 IKAnalyzer.cfg.xml

使用 http://192.168.128.130/fenci/myword.txt 的前提是配置了 nginx ,然后将文件 myword.txt 放在对应位置

修改后的 xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
	<comment>IK Analyzer 扩展配置</comment>
	<!--配置自己的扩展字典 -->
	<entry key="ext_dict"></entry>
	<!--配置自己的扩展停止词字典-->
	<entry key="ext_stopwords"></entry>
	<!--指定自定义词库 -->
	<entry key="remote_ext_dict">http://192.168.128.130/fenci/myword.txt</entry>
	<!--配置远程扩展停止词字典-->
	<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

原来的 xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
	<comment>IK Analyzer 扩展配置</comment>
	<!--用户可以在这里配置自己的扩展字典 -->
	<entry key="ext_dict"></entry>
	<!--用户可以在这里配置自己的扩展停止词字典-->
	<entry key="ext_stopwords"></entry>
	<!--用户可以在这里配置远程扩展字典 -->
	<!-- <entry key="remote_ext_dict">words_location</entry> -->
	<!--用户可以在这里配置远程扩展停止词字典-->
	<!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

利用 nginx 发布静态资源,按照请求路径,创建对应的文件夹以及文件,放在 nginx 的 html 下,然后重启 es 服务器,重启 nginx。

在 kibana 中测试分词效果,更新完成后,es 只会对新增的数据用新词分词。历史数据是不会重新分词的。如果想要历史数据重新分词

需要执行:

POST my_index/_update_by_query?conflicts=proceed

5、Elasticsearch-Rest-Clien

  1. 9300:TCP
    spring-data-elasticsearch:transport-api.jar;
    springboot 版本不同, transport-api.jar 不同,不能适配 es 版本
    7.x 已经不建议使用,8 以后就要废弃
  2. 9200:HTTP
    JestClient: 非官方,更新慢
    RestTemplate: 模拟发 HTTP 请求,ES 很多操作需要自己封装,麻烦
    HttpClient: 同上
    Elasticsearch-Rest-Client: 官方 RestClient,封装了 ES 操作,API 层次分明,上手简单

最终选择 Elasticsearch-Rest-Client(elasticsearch-rest-high-level-client)

5.1 SpringBoot 整合

依赖

新建检索模块,只添加web依赖,因为此处的 ES 依赖最高只支持6.3版本

谷粒商城(三)

然后添加es整合依赖,注意版本需要和服务器上es的版本一致

<!-- 导入es的 rest-high-level-client -->
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>7.4.2</version>
</dependency>

参考官网

添加后发现注入的 es 服务版本不对
谷粒商城(三)
是因为 springboot 里面指定了 es 版本
谷粒商城(三)
只需要自己指定一下版本即可

<properties>
    <java.version>1.8</java.version>
    <elasticsearch.version>7.12.1</elasticsearch.version>
    <spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>

需要注入nacos:common模块注入,配置文件,注册发现注解

<dependency>
    <groupId>afei.gulimall</groupId>
    <artifactId>gulimall-common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
配置类

参考官网

/**
 * 1、导入配置
 * 2、编写配置,给容器注入一个RestHighLevelClient
 * 3、参照API 官网进行开发
 */
@Configuration
public class GulimallElasticsearchConfig {
    public static final RequestOptions COMMON_OPTIONS;
    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
//        builder.addHeader("Authorization", "Bearer " + TOKEN);
//        builder.setHttpAsyncResponseConsumerFactory(
//                new HttpAsyncResponseConsumerFactory
//                        .HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
        COMMON_OPTIONS = builder.build();
    }

    @Bean
    public RestHighLevelClient esRestClient() {
        RestClientBuilder builder = null;
        builder = RestClient.builder(new HttpHost("192.168.56.10",
											 	   9200, "http"));
        RestHighLevelClient client = new RestHighLevelClient(builder);
        
//        RestHighLevelClient client = new RestHighLevelClient(
//                RestClient.builder(
//                        new HttpHost("localhost", 9200, "http"),
//                        new HttpHost("localhost", 9201, "http")));
        return client;
    }

}
使用

参照官方文档

简单 添加 / 更新 数据

@Autowired
private RestHighLevelClient client;

//测试注入是否成功
@Test
public void contextLoads() {
    System.out.println(client);
}

/**
 * 添加或者更新
 */
@Test
public void indexData() throws IOException {
    IndexRequest indexRequest = new IndexRequest("users");
    User user = new User();
    user.setAge(19);
    user.setGender("男");
    user.setUserName("张三");
    String jsonString = JSON.toJSONString(user);
    
    indexRequest.source(jsonString,XContentType.JSON);
    // 执行操作
    IndexResponse index = client.index(indexRequest, GulimallElasticsearchConfig.COMMON_OPTIONS);
    // 提取有用的响应数据
    System.out.println(index);
}

复杂检索

参照官方文档

@Test
public void searchTest() throws IOException {
    // 1、创建检索请求
    SearchRequest searchRequest = new SearchRequest();
    // 指定索引
    searchRequest.indices("bank");
    // 指定 DSL,检索条件
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

    sourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));

    //1、2 按照年龄值分布进行聚合
    TermsAggregationBuilder aggAvg = AggregationBuilders.terms("ageAgg").field("age").size(10);
    sourceBuilder.aggregation(aggAvg);

    //1、3 计算平均薪资
    AvgAggregationBuilder balanceAvg = AggregationBuilders.avg("balanceAvg").field("balance");
    sourceBuilder.aggregation(balanceAvg);

    System.out.println("检索条件" + sourceBuilder.toString());

    searchRequest.source(sourceBuilder);

    // 2、执行检索
    SearchResponse searchResponse = client.search(searchRequest, GulimallElasticsearchConfig.COMMON_OPTIONS);

    // 3、分析结果
    System.out.println(searchResponse.toString());

    // 4、拿到命中的结果
    SearchHits hits = searchResponse.getHits();
    //搜索请求的匹配
    SearchHit[] searchHits = hits.getHits();
    for (SearchHit hit : searchHits) {
    	/**
    	 *   "_index" : "bank",
    	 *   "_type" : "account",
    	 *   "_id" : "970",
    	 *   "_score" : 6.5032897,
    	 *   "_source" : {}
    	 */
    	//hit.getIndex();hit.getType();hit.getId(); 等等
        // 拿到 _source 里面完整结果字符串
        String sourceAsString = hit.getSourceAsString();
        // 转换成实体类
        Accout accout = JSON.parseObject(sourceAsString, Accout.class);
        System.out.println("account:" + accout );
    }

    // 5、拿到聚合
    Aggregations aggregations = searchResponse.getAggregations();
//  for (Aggregation aggregation : aggregations.asList()) {
//  }
    // 6、通过先前名字拿到对应聚合
    Terms ageAgg1 = aggregations.get("ageAgg");
    for (Terms.Bucket bucket : ageAgg1.getBuckets()) {
        // 7、拿到结果
        String keyAsString = bucket.getKeyAsString();
        System.out.println("年龄:" + keyAsString);
        long docCount = bucket.getDocCount();
        System.out.println("个数:" + docCount);
    }
    Avg balanceAvg1 = aggregations.get("balanceAvg");
    System.out.println("平均薪资:" + balanceAvg1.getValue());
    System.out.println(searchResponse.toString());
}

执行结果:

accout:GulimallSearchApplicationTests.Accout(account_number=970, balance=19648, firstname=Forbes, lastname=Wallace, age=28, gender=M, address=990 Mill Road, employer=Pheast, email=forbeswallace@pheast.com, city=Lopezo, state=AK)accout:GulimallSearchApplicationTests.Accout(account_number=136, balance=45801, firstname=Winnie, lastname=Holland, age=38, gender=M, address=198 Mill Lane, employer=Neteria, email=winnieholland@neteria.com, city=Urie, state=IL)accout:GulimallSearchApplicationTests.Accout(account_number=345, balance=9812, firstname=Parker, lastname=Hines, age=38, gender=M, address=715 Mill Avenue, employer=Baluba, email=parkerhines@baluba.com, city=Blackgum, state=KY)accout:GulimallSearchApplicationTests.Accout(account_number=472, balance=25571, firstname=Lee, lastname=Long, age=32, gender=F, address=288 Mill Street, employer=Comverges, email=leelong@comverges.com, city=Movico, state=MT)年龄:38
个数:2
年龄:28
个数:1
年龄:32
个数:1
平均薪水:25208.0

ELK
Elasticsearch :用于检索数据

Logstach:存储数据

Kibana:视图化查看数据
谷粒商城(三)

性能压测 - 压力测试

1、压力测试

使用压力测试,我们有希望找到很多种用其他测试方法更难发现的错误。有两种错误类型是:内存泄漏,并发与同步
有效的压力测试系统将应用以下这些关键条件:重复,并发,量级,随机变化

1.1 性能指标

  • 响应时间(Response Time: RT)
    响应时间指用户从客户端发起一个请求开始,到客户端接收到从服务器端返回的响应结束,整个过程所耗费的时间
  • HPS(Hits Per Second) :每秒点击次数,单位是次/秒
  • TPS(Transaction per Second):系统每秒处理交易数,单位是笔/秒
  • QPS(Query per Second):系统每秒处理查询次数,单位是次/秒
    对于互联网业务中,如果某些业务有且仅有一个请求连接,那么TPS=QPS=HPS,一般情况下用 TPS 来衡量整个业务流程,用 QPS 来衡量接口查询次数,用 HPS 来表示对服务器单击请求。
  • 无论 TPS、QPS、HPS,此指标是衡量系统处理能力非常重要的指标,越大越好,根据经验,一般情况下:
    金融行业:1000TPS~50000TPS,不包括互联网化的活动
    保险行业:100TPS~100000TPS,不包括互联网化的活动
    制造行业:10TPS~5000TPS
    互联网电子商务:10000TPS~1000000TPS
    互联网中型网站:1000TPS~50000TPS
    互联网小型网站:500TPS~10000TPS
  • 最大响应时间(Max Response Time) 指用户发出请求或者指令到系统做出反应(响应)的最大时间
  • 最少响应时间(Mininum ResponseTime) 指用户发出请求或者指令到系统做出反应(响应)的最少时间
  • 90%响应时间(90% Response Time) 是指所有用户的响应时间进行排序,第 90%的响应时间
  • 从外部看,性能测试主要关注如下三个指标
    • 吞吐量:每秒钟系统能够处理的请求数、任务数。
    • 响应时间:服务处理一个请求或一个任务的耗时。
    • 错误率:一批请求中结果出错的请求所占比例。

1.2 JMeter

JMeter 安装

下载地址
下载对应的压缩包,解压运行 jmeter.bat 即可

JMeter 压测示例

1、添加线程组

谷粒商城(三)

谷粒商城(三)

线程组参数详解:

  • 线程数:虚拟用户数。一个虚拟用户占用一个进程或线程。设置多少虚拟用户数在这里也就是设置多少个线程数
  • Ramp-Up Period(in seconds)准备时长:设置的虚拟用户数需要多长时间全部启动。如果线程数为 10,准备时长为 2,那么需要 2 秒钟启动 10 个线程,也就是每秒钟启动 5 个线程。
  • 循环次数:每个线程发送请求的次数。如果线程数为 10,循环次数为 100,那么每个线程发送 100 次请求。总请求数为 10*100=1000 。如果勾选了“永远”,那么所有线程会一直发送请求,一到选择停止运行脚本。
  • Delay Thread creation until needed:直到需要时延迟线程的创建。
  • 调度器:设置线程组启动的开始时间和结束时间(配置调度器时,需要勾选循环次数为永远)
  • 持续时间(秒):测试持续时间,会覆盖结束时间
  • 启动延迟(秒):测试延迟启动时间,会覆盖启动时间
  • 启动时间:测试启动时间,启动延迟会覆盖它。当启动时间已过,手动只需测试时当前时间也会覆盖它。
  • 结束时间:测试结束时间,持续时间会覆盖它

2、添加 HTTP 请求

谷粒商城(三)

谷粒商城(三)

3、添加监听器

谷粒商城(三)

4、启动压测&查看分析结果

结果分析

  • 有错误率同开发确认,确定是否允许错误的发生或者错误率允许在多大的范围内;
  • Throughput 吞吐量每秒请求的数大于并发数,则可以慢慢的往上面增加;若在压测的机器性能很好的情况下,出现吞吐量小于并发数,说明并发数不能再增加了,可以慢慢的往下减,找到最佳的并发数;
  • 压测结束,登陆相应的 web 服务器查看 CPU 等性能指标,进行数据的分析;
  • 最大的 tps,不断的增加并发数,加到 tps 达到一定值开始出现下降,那么那个值就是最大的 tps。
  • 最大的并发数:最大的并发数和最大的 tps 是不同的概率,一般不断增加并发数,达到一个值后,服务器出现请求超时,则可认为该值为最大的并发数。
  • 压测过程出现性能瓶颈,若压力机任务管理器查看到的 cpu、网络和 cpu 都正常,未达到 90%以上,则可以说明服务器有问题,压力机没有问题。
  • 影响性能考虑点包括:
    数据库、应用程序、中间件(tomact、Nginx)、网络和操作系统等方面
  • 首先考虑自己的应用属于 CPU 密集型 还是 IO 密集型

1.3 JMeter Address Already in use 错误解决

windows 本身提供的端口访问机制的问题。
Windows 提供给 TCP/IP 链接的端口为 1024-5000,并且要四分钟来循环回收他们。就导致我们在短时间内跑大量的请求时将端口占满了。

解决:

  1. cmd 中,用 regedit 命令打开注册表
  2. HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters 目录下,
    1. 右击 parameters,添加一个新的 DWORD,名字为 MaxUserPort
    2. 然后双击 MaxUserPort,输入数值数据为 65534,基数选择十进制(如果是分布式运行的话,控制机器和负载机器都需要这样操作哦)
    3. 同理增加 TCPTimedWaitDelay,值为十进制 30
  3. 修改配置完毕之后记得重启机器才会生效
    说明文档

1.4 性能监控-jvisualvm使用

Window+R ,输入 jvisualvm ,便可打开

idea 启动服务,在 jvisualvm 里面便可监控性能情况

谷粒商城(三)

安装插件方便查看 gc

工具->插件

谷粒商城(三)

如果 503 错误解决:
打开网址 https://visualvm.github.io/pluginscenters.html

cmd 查看自己的 jdk 版本,找到对应的
谷粒商城(三)
谷粒商城(三)

复制上面查询出来的链接。并重新设置上即可

谷粒商城(三)

2、性能优化

缓存 - 分布式锁

1、缓存使用-本地缓存

对于复杂的业务,已经不能够通过代码层面的优化和数据库层面的优化,来达到增加吞吐量的目的。这就想要使用到缓存。

哪些数据适合放入缓存?

  • 即时性、数据一致性要求不高的
  • 访问量大且更新频率不高的数据(读多,写少)
	data = cache.load(id);  //从缓存加载数据
	if(data == null){
		data = db.load(id);  //从数据库加载数据
		cache.put(id,data);  //保存到 cache 中
	}
	return data;

注意: 在开发中,凡是放入缓存中的数据我们都应该指定过期时间,使其可以在系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题

2、整合 redis

pom.xml

注意此处引入的是 jedis 的客户端依赖

<!--引入redis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <!--不加载自身的 lettuce-->
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--jedis-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

默认使用的是 lettuce 客户端

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置文件配置 redis

spring:
  redis:
    host: 192.168.56.10
    port: 6379

测试:

使用 RedisTemplate 操作 redis

	@Autowired
	StringRedisTemplate stringRedisTemplate;
	
	@Test
	public void testStringRedisTemplate() {
	    stringRedisTemplate.opsForValue().set("hello","world_" + UUID.randomUUID().toString());
	    String hello = stringRedisTemplate.opsForValue().get("hello");
	    System.out.println("之前保存的数据是:" + hello);
	}

查看 springboot 自动配置类

谷粒商城(三)

3、优化三级分类获取

先从缓存中获取分类三级分类数据,如果没有再从数据库中查询,并且将查询结果以JSON字符串的形式存放到Reids中的

给缓存中放 json 字符串、拿出的是 json 字符串,还要逆转为能用的对象类型【序列化和反序列化】

修改原方法,改方法名为 getCatelogJsonFromDb

public Map<String, List<Catalog2Vo>> getCatelogJsonFromDb() {
    //查询所有的一级分类
    List<CategoryEntity> l1List = getLevel1Categorys();
    Map<String, List<Catalog2Vo>> map = l1List.stream().collect(Collectors.toMap(k-> k.getCatId().toString(), l1 -> {

        //根据一级分类查询二级分类信息
        List<CategoryEntity> l2entities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", l1.getCatId()));
        List<Catalog2Vo> l2List = null;
        if (l2entities != null) {
            l2List = l2entities.stream().map(l2 -> {
                Catalog2Vo catalog2Vo = new Catalog2Vo();
                catalog2Vo.setCatalog1Id(l1.getCatId().toString());
                catalog2Vo.setId(l2.getCatId().toString());
                catalog2Vo.setName(l2.getName());

                //根据二级分类查询三级分类信息
                List<CategoryEntity> l3entities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", l2.getCatId()));
                if (l3entities != null) {
                    List<Catalog2Vo.Catalog3Vo> l3List = l3entities.stream().map(l3 -> {
                        Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(l3.getCatId().toString(), l2.getCatId().toString(), l3.getName());
                        return catalog3Vo;
                    }).collect(Collectors.toList());
                    catalog2Vo.setCatalog3List(l3List);
                }
                return catalog2Vo;
            }).collect(Collectors.toList());
        }
        return l2List;
    }));
    return map;
}

另编写方法 getCatalogJson()使用缓存

// 给缓存中放 json 字符串、拿出的是 json 字符串,还要逆转为能用的对象类型【序列化和反序列化】
@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
    // 1、加入缓存逻辑,缓存中放的数据是 json 字符串:JSON 跨语言、跨平台兼容
    String catalogJson = stringRedisTemplate.opsForValue().get("CatalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
        // 2、缓存没有,从数据库中查询
        Map<String, List<Catalog2Vo>> catelogJsonFromDb = getCatelogJsonFromDb();
        // 3、查询到数据,将数据转成 JSON 后放入缓存中
        String s = JSON.toJSONString(catelogJsonFromDb);
        stringRedisTemplate.opsForValue().set("CatalogJson",s);
        return catelogJsonFromDb;
    }
    // 4、从缓存中获取的json数据转换为我们指定的对象
    Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
    return result;
}

1)、压力测试出的内存泄露及解决

如果使用 lettuce 客户端,jmeter 测试高并发环境下会产生堆外内存溢出 OutOfDirectMemoryError
谷粒商城(三)
1、SpringBoot2.0以后默认使用 Lettuce作为操作redis的客户端,它使用netty进行网络通信
2、lettuce的bug导致netty堆外内存溢出,可以通过-Dio.netty.maxDirectMemory 进行设置 netty 堆内存
当指定了服务使用的VM options最大内存-Xmx300m,但是没有指定 netty 堆内存,netty 就默认使用 -Xmx300m
谷粒商城(三)
如果只是调大内存,只会延缓异常的出现
3、解决方案:不能使用 -Dio.netty.maxDirectMemory调大内存
1)、升级 lettuce客户端;
2)、切换使用jedis,不使用 lettuce
4、注意:
lettuce、jedis 操作redis的底层客户端,Spring再次封装。
springboot底层封装的时候将两个都封装为 redisTemplate,所以代码不需要修改

2)、加本地锁解决缓存击穿问题

修改代码:将 查库操作 锁起来

@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
    // 1、加入缓存逻辑,缓存中放的数据是 json 字符串:JSON 跨语言、跨平台兼容
    String catalogJson = stringRedisTemplate.opsForValue().get("CatalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
        // 2、缓存没有,从数据库中查询
        System.out.println("缓存不命中.....将要查询数据库...");
        Map<String, List<Catalog2Vo>> catelogJsonFromDb = getCatelogJsonFromDbLock();
        // 3、查询到数据,将数据转成 JSON 后放入缓存中
        String s = JSON.toJSONString(catelogJsonFromDb);
        stringRedisTemplate.opsForValue().set("CatalogJson",s);
        return catelogJsonFromDb;
    }
    System.out.println("缓存命中...直接返回...");
    // 4、从缓存中获取的json数据转换为我们指定的对象
    Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
    return result;
}
public Map<String, List<Catalog2Vo>> getCatelogJsonFromDbLock() {
    /**
     *  为了解决缓存击穿问题,给业务逻辑加锁:
     *  只要是同一把锁,就能锁住需要这个锁的所有线程
     *     1、synchronized(this):SpringBoot所有组件在容器中都是单例的,所有可行。
     *      只不过分布式情况下,每个商品服务都有自己的一把锁
     *  TODO:本地锁:synchronized、JUC(lock),在分布式情况下想要锁住所有,必须使用分布式锁
     */
    synchronized (this){
        //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
        String catalogJSON = stringRedisTemplate.opsForValue().get("CatalogJson");
        if (!StringUtils.isEmpty(catalogJSON)){
            //如果缓存不为null直接缓存
            Map<String,List<Catalog2Vo>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String,List<Catalog2Vo>>>(){});
            return result;
        }
        System.out.println("查询了数据库。。。。。");
        //查询三级分类数据
        此处是业务逻辑代码++++++++++++++++++++++++
    }
}

使用 JMeter 进行高并发访问测试,发现并不是线程安全的

谷粒商城(三)

谷粒商城(三)

因为当 查库后,释放锁,在将结果放入缓存的这段时间里,有其他线程确认缓存没有,又再次查询了数据库,因此我们要将结果放入缓存也进行加锁

3)、锁时序问题

修改代码:将 查库操作、结果放入缓存的操作 锁起来

@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
    // 1、加入缓存逻辑,缓存中放的数据是 json 字符串:JSON 跨语言、跨平台兼容
    String catalogJson = stringRedisTemplate.opsForValue().get("CatalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
        // 2、缓存没有,从数据库中查询
        System.out.println("缓存不命中.....将要查询数据库...");
        Map<String, List<Catalog2Vo>> catelogJsonFromDb = getCatelogJsonFromDbLock();
        // 3、查询到数据,将数据转成 JSON 后放入缓存中
        // String s = JSON.toJSONString(catelogJsonFromDb);
        // stringRedisTemplate.opsForValue().set("CatalogJson",s);
        return catelogJsonFromDb;
    }
    System.out.println("缓存命中...直接返回...");
    // 4、从缓存中获取的json数据转换为我们指定的对象
    Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
    return result;
}
public Map<String, List<Catalog2Vo>> getCatelogJsonFromDbLock() {
    /**
     *  为了解决缓存击穿问题,给业务逻辑加锁:
     *  只要是同一把锁,就能锁住需要这个锁的所有线程
     *     1、synchronized(this):SpringBoot所有组件在容器中都是单例的,所有可行。
     *      只不过分布式情况下,每个商品服务都有自己的一把锁
     *  TODO:本地锁:synchronized、JUC(lock),在分布式情况下想要锁住所有,必须使用分布式锁
     */
    synchronized (this){
        //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
        String catalogJSON = stringRedisTemplate.opsForValue().get("CatalogJson");
        if (!StringUtils.isEmpty(catalogJSON)){
            //如果缓存不为null直接缓存
            Map<String,List<Catalog2Vo>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String,List<Catalog2Vo>>>(){});
            return result;
        }
        System.out.println("查询了数据库。。。。。");
        //查询三级分类数据
        此处是业务逻辑代码++++++++++++++++++++++++

        //将结果放入缓存的操作也锁起来
        String s = JSON.toJSONString(map);
        stringRedisTemplate.opsForValue().set("CatalogJson",s);
        return map;
    }
}

使用 JMeter 进行高并发访问测试,解决问题

谷粒商城(三)

这里我们使用了双端检锁机制来控制线程的并发访问数据库。一个线程进入到临界区之前,判断缓存中是否有数据,进入到临界区后,再次判断缓存中是否有数据,这样做的目的是避免阻塞在临界区的多个线程,在其他线程释放锁后,重复进行数据库的查询和放缓存操作。

注:关于双端检锁机制的简单了解,可以参照:https://blog.csdn.net/maritimesun/article/details/7831065

4)、本地锁在分布式下的问题

通过观察日志,能够发现只有一个线程查询了数据库,其他线程都是直接从缓存中获取到数据的。所以在单体应用上实现了多线程的并发访问。

由于这里我们的“gulimall-product”就部署了一台,所以看上去一切祥和,但是在如果部署了多台,问题就出现了,主要问题就集中在我们所使用的锁上。我们锁使用的是“synchronized ”,这是一种本地锁,它只是在一台设备上有效,无法实现分布式情况下,锁住其他设备的相同操作。

我们现在的操作模型,表现为如下的形式:

谷粒商城(三)

将商品服务复制几个

谷粒商城(三)

谷粒商城(三)

谷粒商城(三)
启动四个服务,JMeter 测试访问配置的 nginx

谷粒商城(三)

我没有配置域名和nginx,所以将请求转给网关,通过网关负载均衡到 “gulimall” 服务的多个实例上

执行结果发现每次服务都会查询一次数据库,虽然都查一次,但是每个服务都查一次就有问题

4、分布式锁的使用——使用 redis

下面使用redis来实现分布式锁,使用的是SET key value [EX seconds] [PX milliseconds] [NX|XX]

参考文档

1)、阶段一

谷粒商城(三)

使用 redis 获取锁,若获取到锁则执行业务,若没有则等待重试

public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisLock1() {
    //1、占分布式锁。去redis占坑
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
    if (lock) {
        //2、加锁成功,执行业务
        Map<String, List<Catalog2Vo>> dataFromDB = getCatelogJsonFromDbWithCache();
        //3、删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //4、没获取到锁,等待100ms重试
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return getCatelogJsonWithRedisLock1();  //自旋的方式回调
    }
}

service业务方法

// 给缓存中放 json 字符串、拿出的是 json 字符串,还要逆转为能用的对象类型【序列化和反序列化】
@Override
public Map<String, List<Catalog2Vo>> getCatalogJson() {
    // 1、加入缓存逻辑,缓存中放的数据是 json 字符串:JSON 跨语言、跨平台兼容
    String catalogJson = stringRedisTemplate.opsForValue().get("CatalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
        // 2、缓存没有,从数据库中查询
        // 3、查询到数据,将数据转成 JSON 后放入缓存中(查询和放入缓存应该是原子操作,都被锁住)
        System.out.println("缓存不命中.....将要查询数据库...");
        Map<String, List<Catalog2Vo>> catelogJsonFromDb = getCatelogJsonWithLocalLock1();
        return catelogJsonFromDb;
    }
    System.out.println("缓存命中...直接返回...");
    // 4、从缓存中获取的json数据转换为我们指定的对象
    Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Catalog2Vo>>>() {
    });
    return result;
}

查询数据库(使用redis做缓存)

public Map<String, List<Catalog2Vo>> getCatelogJsonFromDbWithCache() {
    //得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
    String catalogJSON = stringRedisTemplate.opsForValue().get("CatalogJson");
    if (!StringUtils.isEmpty(catalogJSON)) {
        //如果缓存不为null直接缓存
        Map<String, List<Catalog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2Vo>>>() {});
        return result;
    }
    
    //业务逻辑:查询三级分类
    System.out.println("查询了数据库。。。。。");
    List<CategoryEntity> l1List = getLevel1Categorys();
    Map<String, List<Catalog2Vo>> dataFromDB = l1List.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), l1 -> {
        //根据一级分类查询二级分类信息
        List<CategoryEntity> l2entities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", l1.getCatId()));
        List<Catalog2Vo> l2List = null;
        if (l2entities != null) {
            l2List = l2entities.stream().map(l2 -> {
                Catalog2Vo catalog2Vo = new Catalog2Vo();
                catalog2Vo.setCatalog1Id(l1.getCatId().toString());
                catalog2Vo.setId(l2.getCatId().toString());
                catalog2Vo.setName(l2.getName());
                //根据二级分类查询三级分类信息
                List<CategoryEntity> l3entities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", l2.getCatId()));
                if (l3entities != null) {
                    List<Catalog2Vo.Catalog3Vo> l3List = l3entities.stream().map(l3 -> {
                        Catalog2Vo.Catalog3Vo catalog3Vo = new Catalog2Vo.Catalog3Vo(l3.getCatId().toString(), l2.getCatId().toString(), l3.getName());
                        return catalog3Vo;
                    }).collect(Collectors.toList());
                    catalog2Vo.setCatalog3List(l3List);
                }
                return catalog2Vo;
            }).collect(Collectors.toList());
        }
        return l2List;
    }));
    //将结果放入缓存的操作也锁起来
    String s = JSON.toJSONString(dataFromDB);
    stringRedisTemplate.opsForValue().set("CatalogJson", s);
    return dataFromDB;
}
  • 问题:
    第一步加锁成功 setnx 占好了位之后
    业务代码异常或者程序在页面过程中宕机,没有执行删除锁逻辑,这就造成了死锁
  • 解决:
    设置锁的自动过期,即使没有删除,会自动删除

2)、阶段二

谷粒商城(三)

设置锁的自动过期,即使没有删除,会自动删除

public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisLock2() {
    //1、占分布式锁。去redis占坑
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111");
    if (lock) {
        //2、设置过期时间
        stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        //3、加锁成功,执行业务
        Map<String, List<Catalog2Vo>> dataFromDB = getCatelogJsonFromDbWithCache();
        //4、删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //5、没获取到锁,等待100ms重试(已省略)
        return getCatelogJsonWithRedisLock2();  //自旋的方式回调
    }
}
  • 问题:
    第二步正要设置过期时间之前
    设置过期时间之前宕机,又死锁了
  • 解决:
    设置锁的自动过期和占位(加锁)必须是原子的,redis 支持 setnx ex 命令

3)、阶段三

谷粒商城(三)

设置锁的自动过期和占位(加锁)必须是原子的,redis 支持 setnx ex 命令

public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisLock3() {
    //1、占分布式锁。去redis占坑
    //设置过期时间和加锁是一个原子操作
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1111",30, TimeUnit.SECONDS);
    if (lock) {
        //设置过期时间 stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        //2、加锁成功,执行业务
        Map<String, List<Catalog2Vo>> dataFromDB = getCatelogJsonFromDbWithCache();
        //3、删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //4、没获取到锁,等待100ms重试(已省略)
        return getCatelogJsonWithRedisLock3();  //自旋的方式回调
    }
}
  • 问题:
    第三步删除锁直接删除???
    如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了
  • 解决:
    占锁的时候,值指定为 uuid,每个人匹配确定是自己的锁才删除

4)、阶段四

谷粒商城(三)

谷粒商城(三)

占锁的时候,值指定为 uuid,每个人匹配确定是自己的锁才删除

public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisLock4() {
    //1、占分布式锁。去redis占坑
    //设置过期时间和加锁是一个原子操作
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30, TimeUnit.SECONDS);
    if (lock) {
        //2、加锁成功,执行业务
        Map<String, List<Catalog2Vo>> dataFromDB = getCatelogJsonFromDbWithCache();
        //3、删除锁
        //要确定删除的是我们自己加的锁
        String lockValue = stringRedisTemplate.opsForValue().get("lock");
        if (uuid.equals(lockValue)) {
            stringRedisTemplate.delete("lock");
        }
        return dataFromDB;
    }else {
        //4、没获取到锁,等待100ms重试(已省略)
        return getCatelogJsonWithRedisLock3();  //自旋的方式回调
    }
}
  • 问题:
    第三步当判断确定是自己的锁之后,删除之前
    如果正好判断是当前锁,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁
  • 解决:
    删除锁必须保证原子性。使用 redis + lua 脚本完成

5)、阶段五

谷粒商城(三)

删除锁必须保证原子性。使用 redis + lua 脚本完成

public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisLock5() {
    //1、占分布式锁。去redis占坑
    //设置过期时间和加锁是一个原子操作
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30, TimeUnit.SECONDS);
    if (lock) {
        Map<String, List<Catalog2Vo>> dataFromDB;
        try {
            //2、加锁成功,执行业务
            dataFromDB = getCatelogJsonFromDbWithCache();
        } finally {
            //3、删除锁
            //获取锁值对比 + 对比成功删除锁 =是一个原子操作,使用Lua脚本解锁
            String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
                    "    return redis.call(\"del\",KEYS[1])\n" +
                    "else\n" +
                    "    return 0\n" +
                    "end";
            //删除锁
            Long lock1 = stringRedisTemplate.execute(
                    new DefaultRedisScript<>(script, Long.class),
                    Arrays.asList("lock"),
                    uuid);
        }
        return dataFromDB;
    }else {
        //4、没获取到锁,等待100ms重试(已省略)
        return getCatelogJsonWithRedisLock5();  //自旋的方式回调
    }
}

实现,加锁【占位+过期时间】和删除锁【判断+删除】的原子性。
未实现,锁的自动续期

官网文档上详细说明了 不推荐使用 setnx 来实现分布式锁,在 Java 语言环境下推荐使用 Redisson
谷粒商城(三)

5、Redisson

1)、配置

中文文档

商品服务导入依赖

谷粒商城(三)

<!--以后使用 redisson 作为分布锁,分布式对象等功能-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

配置类

@Configuration
public class MyRedissonConfig {
    /**
     * 所有对Redisson的使用都是通过 RedissonClient 对象
     */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson(){
        // 1 创建配置
        Config config = new Config();
        //注意:用"rediss://"或者"redis://"
        config.useSingleServer().setAddress("redis://150.158.171.144:6379");
        // 2 根据Config创建RedissonClient实例
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}

测试是否注入了 RedissonClient

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallProductApplicationTests {
	
	@Autowired
    RedissonClient redissonClient;

    @Test
    public void redissonClient(){
        System.out.println(redissonClient);
    }
}

2)、Redisson - Lock 锁测试 & Redisson - Lock 看门狗原理 - Redisson 如何解决死锁

Redison分布式锁

@ResponseBody
@GetMapping("hello")
public String hello(){
    // 1 获取一把锁:只要锁的名字一样,就是同一把锁
    RLock mylock = redisson.getLock("mylock");
    // 2 加锁
    mylock.lock(); //是阻塞式等待,类似于JUC中synchronized阻塞等待,不需要手动回调
    // 3 执行业务逻辑
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        // 4 解锁
        //假如解锁代码没有运行,reidsson会不会出现死锁
        System.out.println("释放锁...." + Thread.currentThread().getId());
        mylock.unlock();
    }
    return "";
}

提问:假设当一个服务执行加锁之后宕机,未执行解锁,是否会死锁?
—— 在加锁后,即便我们没有释放锁,也会自动的释放锁,这是因为在 Redisson 中会为每个锁加上 “leaseTime”,默认是30秒

提问:假设业务执行所需要的时间超过 30 秒,锁会不会被自动删掉?
—— 锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s,不用担心业务时间长,锁自动过期后被删掉
—— 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认会在30s以后自动删除

提问:锁的过期时间只能是 30s 吗?我偏要 40s
—— 加锁时可以指定过期时间

mylock.lock(40, TimeUnit.SECONDS); //40s 后自动删除

提问:在锁到期后,是否会自动续期?
—— 如果传递了锁的超时时间,就发送给 redis 执行脚本,进行占锁,默认超时就是我们指定的时间(在指定了超时时间后,不会进行自动续期,即便业务仍然在执行,时间到了后锁会自动删除)
—— 如果没有指定锁的超时时间,就是用 LockWatchchdogTimeout 看门狗的默认时间 30s
并且只要占锁成功,就会启动一个定时任务,【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s(看门狗时间 / 3)就自动续期

源码:看门狗原理

谷粒商城(三)

谷粒商城(三)

3)、Redisson - 读写锁

读写锁保证一定能读取到最新数据,修改期间,写锁是一个排他锁(互斥锁,独享锁)读锁是一个共享锁

写锁没释放读锁就必须等待,只要有写的存在,都必须等待

  • 读 + 读 相当于无锁,并发读,只会在 reids中记录好,所有当前的读锁,他们都会同时加锁成功
  • 写 + 读 等待写锁释放
  • 写 + 写 阻塞方式
  • 读 + 写 有读锁,写也需要等待

谷粒商城(三)

代码如下:

@GetMapping("write")
@ResponseBody
public String write(){
    //改数据加写锁,读数据加读锁
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
    RLock writeLock = lock.writeLock();
    String s = "";
    try {
        writeLock.lock();
        Thread.sleep(1000);
        s = UUID.randomUUID().toString();
        stringRedisTemplate.opsForValue().set("writeValue",s);
        System.out.println("写锁加锁成功" + Thread.currentThread().getId());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        writeLock.unlock();
        System.out.println("写锁释放成功" + Thread.currentThread().getId());
    }
    return s;
}
@GetMapping("read")
@ResponseBody
public String read(){
    //获取一把锁:只要锁的名字一样,就是同一把锁
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
    RLock readLock = lock.readLock();
    String s = "";
    try {
        readLock.lock();
        Thread.sleep(1000);
        s = stringRedisTemplate.opsForValue().get("writeValue");
        System.out.println("读锁成功" + Thread.currentThread().getId());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        readLock.unlock();
        System.out.println("读锁释放" + Thread.currentThread().getId());
    }
    return s;
}

4)、Redisson - 信号量

先在 redis 里面新建 键 count 值为 3

谷粒商城(三)

@GetMapping("park")
@ResponseBody
public String park(){
    RSemaphore count = redisson.getSemaphore("count");
    // count.acquire();  //获取一个信号,获取一个值,占用一个车位
    boolean b = count.tryAcquire();
    if (b){
        //执行业务
    }else {
        return "error";
    }
    return b+"";
}
@GetMapping("go")
@ResponseBody
public String go(){
    RSemaphore count = redisson.getSemaphore("count");
    count.release(1);  //释放一个车位
    return "OK";
}

访问一次 http://localhost:10000/go 键值就会 +1

谷粒商城(三)

访问一次 http://localhost:10000/park 键值就会 -1

谷粒商城(三)

5)、Redisson - 闭锁

@GetMapping("lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
    RCountDownLatch lockDoor = redisson.getCountDownLatch("lockDoor");
    lockDoor.trySetCount(5);    //总数为5
    lockDoor.await();
    return "OK";
}
@GetMapping("leave")
@ResponseBody
public String leave(){
    RCountDownLatch lockDoor = redisson.getCountDownLatch("lockDoor");
    lockDoor.countDown();   //计数器减一
    return "OK";
}

当首次访问 http://localhost:10000/lockDoor 时,redis 里面会自己新建键 lockDoor 值为 5

谷粒商城(三)

访问一次 http://localhost:10000/leave ,值就会 -1

当值为 0 时就会自动删除此键 lockDoor

6)、Redission - 缓存一致性解决

优化三级分类获取,使用 redisson 实现分布式锁

//使用redisson
public Map<String, List<Catalog2Vo>> getCatelogJsonWithRedisson(){
    // 注意锁的名字。锁的粒度越细越快
    RLock lock = redissonClient.getLock("catelogJson-lock");
    lock.lock();
    Map<String, List<Catalog2Vo>> dataFromDB;
    try {
    	//在查询数据库的基础上,使用了redis做缓存,将查询的结果放入缓存
    	//(如果缓存有数据将不会查询数据库,会导致当数据库修改而缓存未改时,数据不一致问题)
        dataFromDB = getCatelogJsonFromDbWithCache();
    } finally {
        lock.unlock();
    }
    return dataFromDB;
}

缓存一致性是为了解决数据库和缓存的数据不同步问题的,有两种解决方法

方法一:双写模式(在写数据库方法中同时写缓存)
谷粒商城(三)

两个线程写 最终只有一个线程写成功,后写成功的会把之前写的数据给覆盖,这就会造成脏数据

解决方法:可以给写数据库和写缓存操作加锁,使其成为一个原子性操作。
但是即便如此,数据库和缓存之间总会有不一致问题。
可以给缓存加上过期时间,即缓存过期后又能有最新数据。
双写模式:读到最新数据有延迟,是暂时的脏数据——最终一致性

方法二:失效模式(在写数据库方法中同时删除缓存)

谷粒商城(三)

三个连接:
一号连接 写数据库 然后删缓存
二号连接 写数据库时网络连接慢,还没有写入成功
三号连接 发现无缓存便直接读取数据库,读到的是一号连接写入的数据,此时 二号连接写入数据成功并删除了缓存,三号开始读取结束更新缓存发现更新的是二号的缓存,但是更新的是一号数据

缓存数据一致性解决方案

无论是双写模式还是失效模式,都会到这缓存不一致的问题,即多个实力同时更新会出事,怎么办?

1、如果是用户纯度数据(订单数据、用户数据),这并发几率很小,几乎不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
2、如果是菜单,商品介绍等基础数据,也可以去使用 canal 订阅,binlog 的方式
3、缓存数据 + 过期时间也足够解决大部分业务对缓存的要求
4、通过加锁保证并发读写,写写的时候按照顺序排好队,读读无所谓,所以适合读写锁,(业务不关心脏数据,允许临时脏数据可忽略)

总结:

  • 我们能放入缓存的数据本来就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前的最新值即可
  • 我们不应该过度设计,增加系统的复杂性
  • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点

6)、Redission - Redlock

高级的分布式锁算法 Redlock,来处理集群部署分布式锁问题

如果线程一在Redis的master节点上拿到了锁,但是加锁的key还没同步到slave节点。恰好这时,master节点发生故障,一个slave节点就会升级为master节点。线程二就可以获取同个key的锁啦,但线程一也已经拿到锁了,锁的安全性就没了。

RedLock的实现步骤:

  1. 获取当前时间,以毫秒为单位。
  2. 按顺序向5个master节点请求加锁。客户端设置网络连接和响应超时时间,并且超时时间要小于锁的失效时间。(假设锁自动失效时间为10秒,则超时时间一般在5-50毫秒之间,我们就假设超时时间是50ms吧)。如果超时,跳过该master节点,尽快去尝试下一个master节点。
  3. 客户端使用当前时间减去开始获取锁时间(即步骤1记录的时间),得到获取锁使用的时间。当且仅当超过一半(N/2+1,这里是5/2+1=3个节点)的Redis master节点都获得锁,并且使用的时间小于锁失效时间时,锁才算获取成功。(如上图,10s> 30ms+40ms+50ms+4m0s+50ms)
  4. 如果取到了锁,key的真正有效时间就变啦,需要减去获取锁所使用的时间。
  5. 如果获取锁失败(没有在至少N/2+1个master实例取到锁,有或者获取锁时间已经超过了有效时间),客户端要在所有的master节点上解锁(即便有些master节点根本就没有加锁成功,也需要解锁,以防止有些漏网之鱼)。

简化下步骤就是:

  1. 按顺序向5个master节点请求加锁
  2. 根据设置的超时时间来判断,是不是要跳过该master节点。
  3. 如果大于等于3个节点加锁成功,并且使用的时间小于锁的有效期,即可认定加锁成功啦。
  4. 如果获取锁失败,解锁!

6、Spring Cache

1)、简介

Spring 从 3.1 开始定义了 org.springframework.cache.Cacheorg.sprngframework.cache.CacheManager 接口来统一不同的缓存技术,并支持使用 JCache(JSR-107)注解简化我们的开发。

Cache 接口为缓存的组件规范定义,包含缓存的各种操作集合;Cache 接口下 Spring 提供了各种 XXXCache 的实现:如 RedisCache、EhCache、ConcrrentMapCache 等等

每次调用需要缓存功能的方法时,Spring 会检查指定参数的指定目标方法是否已经被调用过。如果有就直接从缓存中获取方法调用后的结果,如果没有就调用方法并缓存结果后返回给用户,下次调用直接从缓存中获取

使用 Spring 缓存抽象时我们需要关注以下两点:
1、确定方法需要被缓存以及他们的的缓存策略
2、从缓存中读取之前缓存存储的数据

官网地址

2)、整合 SpringCache

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

(因为使用redis缓存,还需引入redis依赖)

<!--引入redis-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <!--不加载自身的 lettuce-->
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<!--jedis-->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
</dependency>

编写配置

/** 
 * 8、整合SpringCache简化缓存开发
 *      1)、依赖
 *         spring-boot-starter-cache、spring-boot-starter-data-redis
 *      2)、写配置
 *          1 自动配置了哪些
 *            CacheAutoConfiguration里的CacheConfigurationImportSelector()方法会根据类型导入 RedisCacheConfiguration
 *            RedisCacheConfiguration里已经配置 RedisCacheManager
 *            并在 determineConfiguration()方法里指明若有 RedisCacheConfiguration 则使用里面的配置,若无则使用默认配置
 *          2 配置文件指定使用redis作为缓存
 *            Spring.cache.type=redis
 *      3)、测试使用缓存
 *          1 开启缓存功能  @EnableCaching
 *          2 只需使用注解即可完成缓存操作
 *              @Cacheable:触发将数据保存到缓存的操作
 *              @CacheEvict:触发将数据从缓存删除的操作,缓存一致性解决中的失效模式
 *              @CachePut:不影响方法执行更新缓存,缓存一致性解决中的双写模式
 *              @Caching:组合以上多个操作
 *              @CacheConfig:在类级别共享缓存的相同配置
 *      4)、原理:
 *          CacheAutoConfiguration 导入 RedisCacheConfiguration ——> 其中自动配置了缓存管理器 RedisCacheManager
 *          RedisCacheManager 里调用 determineConfiguration() 方法初始化所有缓存,每个缓存决定使用什么配置 ——>
 *          如果 RedisCacheConfiguration 有就用已有的,若无就用默认配置 defaultCacheConfig() ——>
 *          想改缓存的配置,只需要给容器中放一个 RedisCacheConfiguration,就会应用到当前RedisCacheManager管理的所有缓存分区中
 */

启动类上使用注解 @EnableCaching

配置文件配置 redis 地址,并且指定 redis 作为缓存

spring:
  # redis
  redis:
    host: 111.111.111.111
    port: 6379
  # 配置使用redis作为缓存
  cache:
    type: redis

3)、@Cacheable - 存入缓存

/**
 * 1、每个需要缓存的数据都要指定缓存分区名(按照业务类型分)
 * 2、 @Cacheable("categorys")
 *      表示当前方法结果需要缓存,若缓存中有则直接取缓存,不执行方法
 *      若缓存中无,会调用方法,最后将结果存入缓存
 * 3、默认行为
 *      1)如果缓存命中,方法不会执行
 *      2)redis中 key 默认自动生成:缓存名::SimpleKey[](自动生成的key值)此处即为 【categorys::SimpleKey []】
 *      3)redis中 value 默认使用jdk序列化机制,将序列化后的数据存到redis
 *      4)默认ttl时间 -1 ,永不过期
 * 4、自定义行为
 *      1)指定生成的缓存的 key
 *          注解属性key指定,传入SpEL表达式,参考 https://docs.spring.io/spring-framework/docs/5.2.10.RELEASE/spring-framework-reference/integration.html#cache-spel-context
 *          key = "'level1Categorys'"   即为 【categorys::level1Categorys】
 *          key = "#root.method.name"   即为 【categorys::getLevel1Categorys】
 *      2)指定缓存数据的存活时间,毫秒为单位
 *          配置文件中指定 Spring.cache.redis.time-to-live=3600000
 *      3)指定缓存的数据以 json 格式存储
 *          自定义配置 RedisCacheConfiguration 即可
 *      4)配置文件中的一些自定义配置
 *          Spring.cache.redis.cache-null-values=true  是否缓存空值,可防止缓存穿透
 *          Spring.cache.redis.key-prefix=CACHE_  缓存key值前缀,若使用指定前缀即为 【CACHE_getLevel1Categorys】,若不指定,默认缓存分区名为前缀
 *          Spring.cache.redis.use-key-prefix=true   缓存key值是否使用前缀,若为false,即为 【getLevel1Categorys】
 */

每个需要缓存的数据都要指定缓存分区名(按照业务类型分)

使用注解 @Cacheable("categorys")
表示当前方法的结果需要缓存,若缓存中有则直接取缓存,不执行方法
若缓存中无,则会调用方法,最后将结果存入缓存

@Cacheable("categorys")
@Override
public List<CategoryEntity> getLevel1Categorys() {
    return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
}

启动服务,查看 redis

谷粒商城(三)

发现当不进行任何配置时,会有一些 默认行为

  1. 如果缓存命中,方法不会执行
  2. redis中 key 默认自动生成: 缓存名::SimpleKey[] (自动生成的key值)
    此处即为 categorys::SimpleKey []
  3. redis中 value 默认使用 jdk序列化机制 ,将序列化后的数据存到redis
  4. 默认 ttl时间 -1 ,永不过期

以下是 自定义行为:

1、指定生成的缓存的 key

使用注解属性 key 指定,传入SpEL表达式,参考官方文档

key = “‘level1Categorys’” 即为 categorys::level1Categorys

@Cacheable(value = {"categorys"},key = "'level1Categorys'")
@Override
public List<CategoryEntity> getLevel1Categorys() {
    return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
}

谷粒商城(三)

key = “#root.method.name” 即为 categorys::getLevel1Categorys

@Cacheable(value = {"categorys"}, key = "#root.method.name")
@Override
public List<CategoryEntity> getLevel1Categorys() {
    return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));
}

谷粒商城(三)

2、指定缓存数据的存活时间,毫秒为单位

配置文件中指定 Spring.cache.redis.time-to-live=3600000

spring:
  # redis
  redis:
    host: 111.111.111.111
    port: 6379
  # 配置使用redis作为缓存
  cache:
    type: redis
    # 指定缓存数据的存活时间,毫秒为单位
    redis:
      time-to-live: 3600000

3、指定缓存的数据以 json 格式存储

自定义配置 RedisCacheConfiguration 即可

@Configuration
public class MyCacheConfig {

    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        
        //自定义配置 key-value 的序列化机制
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
        
        //注意:配置类会覆盖配置文件的配置,若想配置文件生效,需要以下代码
        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }
        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }
        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }
        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }
        return config;
    }
}

谷粒商城(三)

4、配置文件中的一些自定义配置

Spring.cache.redis.cache-null-values=true
是否缓存空值,可防止缓存穿透

Spring.cache.redis.key-prefix=CACHE_
缓存key值前缀,若使用指定前缀即为 CACHE_getLevel1Categorys,若不指定,默认缓存分区名为前缀 【建议使用默认前缀】

如果指定了前缀,则在 redis 可视化界面里面就不会有分区显示,如果使用默认分区名为前缀,就会有分区显示,如下:
谷粒商城(三)

Spring.cache.redis.use-key-prefix=true
缓存key值是否使用前缀,若为false,即为 getLevel1Categorys

spring:
  # redis
  redis:
    host: 111.111.111.111
    port: 6379
  # 配置使用redis作为缓存
  cache:
    type: redis
    redis:
      time-to-live: 3600000
      # 是否缓存空值,可防止缓存穿透
      cache-null-values: true
      key-prefix: CACHE_
      use-key-prefix: true

4)、@CacheEvict - 失效模式

@Cacheable 注解解决了将数据库数据存入缓存,并且是从缓存中直接获取数据

当数据库更新时需要同步更新缓存,有以下两种方式:
@CacheEvict - 失效模式
@CachePut - 双写模式

数据库修改之后会返回最新的数据结果,可以使用双写模式,将返回的结果放入缓存

此处我们的修改方法没有返回数据,所以演示 @CacheEvict 的使用

1、 仅仅更新一个缓存

@CacheEvict(value = {"categorys"}, key = "'getLevel1Categorys'")
@Transactional
@Override
public void updateDetail(CategoryEntity category){}

注意这里 key 也是使用表达式,所以双引号里面还要有单引号

当更新数据后,发现 redis 里面缓存被删除

2、 更新多个缓存

可以使用注解 @Caching :表示组合多个操作

public @interface Caching {
   Cacheable[] cacheable() default {};
   CachePut[] put() default {};
   CacheEvict[] evict() default {};
}
@Caching(evict = {
     @CacheEvict(value = {"categorys"}, key = "'getLevel1Categorys'"),
     @CacheEvict(value = {"categorys"}, key = "'getCatalogJson'")
})
@Transactional
@Override
public void updateDetail(CategoryEntity category)

3、更新整个分区的缓存

除了使用组合多个操作的方式,还可以直接 删除整个分区的缓存,如下

@CacheEvict(value = {"categorys"},allEntries = true)
@Transactional
@Override
public void updateDetail(CategoryEntity category)

5)、原理与不足

  • 缓存穿透:查询一个null数据
    解决:可以通过配置文件缓存空数据spring.cache.redis.cache-null-values=true
  • 缓存击穿:大量并发进来同时查询一个正好过期的数据
    解决:加锁。 默认是无加锁
  • 缓存雪崩:大量的key同时过期
    解决:加上随机时间,spring.cache.redis.time-to-live

只有注解 @Cacheable 可以加锁,通过属性 sync = true 实现,默认 false

@Cacheable(value = {"categorys"},key = "#root.methodName",sync = true)

谷粒商城(三)

总结:

常规数据(读多写少,即时性,一致性要求不高的数据)完全可以使用SpringCache 写模式( 只要缓存数据有过期时间就足够了)

异步&线程池

1、线程回顾

1)、初始化线程的四种方式

  1. 继承 Thread

    ThreadThr threadThr = new ThreadThr();
    threadThr.start();
    
    public static class ThreadThr extends Thread {
        @Override
        public void run() {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 3;
            System.out.println("运行结果" + i);
        }
    }
    
  2. 实现 Runnable

    Thread thread = new Thread(new ThreadRun());
    thread.start();
    
    public static class ThreadRun implements Runnable {
        @Override
        public void run() {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 3;
            System.out.println("运行结果" + i);
        }
    }
    
  3. 实现 Callable 接口 + FutureTask(可以拿到返回结果,可以处理异常)

    FutureTask<Integer> futureTask = new FutureTask<>(new ThreadCall());
    new Thread(futureTask).start();
    //阻塞等待整个线程执行完成,获取返回结果
    Integer integer = futureTask.get();
    System.out.println(integer);
    
    public static class ThreadCall implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程" + Thread.currentThread().getId());
            int i = 10 / 3;
            System.out.println("运行结果" + i);
            return i;
        }
    }
    
  4. 线程池
    以上三种启动线程方式都不用,将所有的多线程异步任务都交给线程池执行

    ExecutorService service = Executors.newFixedThreadPool(10);
    service.execute(new ThreadRun());
    //或者
    new ThreadPollExecutor(corePoolSize,maximumPoolSize,keepAliveTime,TimeUnit,unit,workQueue,threadFactory,handler);
    

2)、线程池详解

线程池【ExecutorService】

创建

通过 Executors,或者 new ThreadPoolExecutor()。后者需要传入七大参数

七大参数:

  1. corePoolSize —— 核心线程数;线程池创建好后就准备就绪的线程数量,等待接收异步任务去执行

  2. maximumPoolSize —— 最大线程数量;控制资源

  3. keepAliveTime —— 存活时间;当线程空闲一定的时间,就释放这些线程【注意是核心线程外的线程】

  4. unit —— 时间单位

  5. workQueue —— 阻塞队列;如果任务很多,将多的任务放在队列里,只要有线程空闲,就会去队列里取新的任务执行

  6. threadFactory —— 线程的创建工厂

    Executors.defaultThreadFactory()  //若不满足使用还可手动重写
    
  7. handler —— 拒绝策略 RejectedExecutionHandler;如果队列满了,无法执行更多的任务,按照指定的拒绝策略来拒绝接收任务

    new ThreadPoolExecutor.AbortPolicy()  //直接丢弃任务,会抛异常
    

示例:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
       200,
       10,
       TimeUnit.SECONDS,
       new LinkedBlockingDeque<>(100000),
       Executors.defaultThreadFactory(),  //若不满足使用还可手动重写
       new ThreadPoolExecutor.AbortPolicy());  //直接丢弃任务,会抛异常
运行流程
  1. 线程池创建,准备好 core 数量的核心线程,准备接受任务
  2. core 满了,就将再进来的任务放入阻塞队列中。空闲的 core 就会自己去阻塞队列获取任务执行
  3. 阻塞队列满了,就直接开新线程执行,最大只能开到 max 指定的数量
  4. max 满了,就用RejectedExecutionHandler拒绝任务
  5. max 都执行完成。max-core 数量空闲的线程会在keepAliveTime指定的时间后自动销毁。最终保持到core大小
细节

阻塞队列 new LinkedBlockingDeque<>() 默认是Integer的最大值,可能会导致内存不够,所以一般需要指定值

提问:一个线程池 core 7; max 20 ,queue:50,100 并发进来怎么分配的?
—— 7个立即执行,50 个进入队列排队,再多开 13 个继续执,剩下 30 个默认拒绝策略。

常见的 4 种线程池
  1. Executors.newCachedThreadPool(); core为0,所有都可回收
  2. Executors.newFixedThreadPool(); 固定大小,core为max,所有都不可回收
  3. Executors.newScheduledThreadPool(); 定时任务线程池
  4. Executors.newSingleThreadExecutor(); 单线程的线程池,core和max都为1,后台从队列中获取任务,挨个执行

2、CompletableFuture 异步编排

通过线程池性能稳定,也可以获取执行结果,并捕获异常,但是,在业务复杂情况下,一个异步调用可能 会依赖另一个异步调用的执行结果

Future 是 Java 5 添加的类,用来描述一个异步计算的结果

在 Java 8 中, 新增加了一个包含 50 个方法左右的类: CompletableFuture,提供了非常强大的
Future 的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力

CompletableFuture 和 FutureTask 同属于 Future 接口的实现类,都可以获取线程的执行结果

谷粒商城(三)

1)、创建异步对象

谷粒商城(三)

  1. runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
  2. 可以传入自定义的线程池,否则就用默认的线程池;

示例:

无返回值,传入 void run()

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    //无返回值  void run();
    CompletableFuture.runAsync(()->{
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 3;
        System.out.println("运行结果" + i);
    },service);
    System.out.println("main......end......");
}

有返回值,传入 T get()

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    //有返回值  T get();
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        return i;
    }, service);
    Integer integer = future.get();  //阻塞式等待
    System.out.println("结果是:"+integer);
    System.out.println("main......end......");
}

谷粒商城(三)

2)、完成回调和异常感知

谷粒商城(三)

  1. whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
  2. 方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
  3. whenComplete 和 whenCompleteAsync 的区别:
    1. whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
    2. whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
  4. 注意:二者都只能是由 有返回结果的 CompletableFuture 调用

示例:

whenComplete 传入 void accept(T t, U u)

接收结果和异常,当有异常时结果为 null

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 0;
        return i;
        //void accept(T t, U u)
    }, service).whenCompleteAsync((result,exception)->{
        // 虽然能得到异常信息,但是没法修改返回的数据
        System.out.println("异步任务成功完成了...结果是:" +result + "异常是:" + exception);
        //R apply(T t);
    });
    System.out.println("main......end......");
}

谷粒商城(三)

exceptionally 传入 R apply(T t)

可以感知到异常,同时返回默认值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 0;
        return i;
    }, service).whenCompleteAsync((result,exception)->{    //void accept(T t, U u)
        // 虽然能得到异常信息,但是没法修改返回的数据
        System.out.println("异步任务成功完成了...结果是:" +result + "异常是:" + exception);
    }).exceptionally((excep)->{    //R apply(T t)
        // 可以感知到异常,同时返回默认值
        return 999;
    });
    Integer integer = future.get();  //阻塞式等待
    System.out.println("结果是:"+integer);
    System.out.println("main......end......");
}

谷粒商城(三)

3)、handle 最终处理

谷粒商城(三)

和 complete 一样,可对结果做最后的处理(可处理异常),但同时还可改变返回默认值

示例:

传入 R apply(T t, U u)

接收结果和异常,当有异常时可指定返回默认值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 0;
        return i;
        
    }, service).handle((result, exception) -> {     //R apply(T t, U u)
        if (result != null) {
            return result * 2;
        }
        if (exception != null) {
            return 999;
        }
        return 0;
    });
    Integer integer = future.get();  //阻塞式等待
    System.out.println("结果是:"+integer);
    System.out.println("main......end......");
}

谷粒商城(三)

4)、线程串行化

谷粒商城(三)

  1. thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回
  2. thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,返回结果
  3. thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行thenRun 的后续操作
  4. 带有 Async 默认是异步执行的。同之前
  5. 以上都要前置任务成功完成

示例:

thenRunAsync 传入 void run()

不能获取上一步的执行结果,同时也无返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture.supplyAsync(()->{
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 4;
        System.out.println("运行结果:"+i);
        return i;
    },service).thenRunAsync(()->{    //void run()
        System.out.println("任务2启动了。。。。");
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

thenAcceptAsync 传入 void accept(T t)

能获取上一步的执行结果,无返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture.supplyAsync(()->{
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 4;
        System.out.println("运行结果:"+i);
        return i;
    },service).thenAcceptAsync((result)->{   //void accept(T t)
        System.out.println("任务2启动了。。"+result);
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

thenApplyAsync 传入 R apply(T t)

能获取上一步的执行结果,有自己的返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("当前线程" + Thread.currentThread().getId());
        int i = 10 / 4;
        System.out.println("运行结果:" + i);
        return i;
    }, service).thenApplyAsync((result) -> {      //R apply(T t)
        System.out.println("任务2启动了。。" + result);
        return result * 10;
    }, service);
    Integer integer = future.get();
    System.out.println("任务2结果是:"+ integer );
    System.out.println("main......end......");
}

谷粒商城(三)

5)、两任务组合 - 都要完成

谷粒商城(三)

  1. 两个任务必须都完成,触发该任务
  2. thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
  3. thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有返回值
  4. runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,处理该任务

示例:AB都完成后才会执行C

runAfterBothAsync 传入 void run()

都完成了计算才会执行,不获取结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        System.out.println("任务 02 结束:");
        return "Hello";
    }, service);
    
    future01.runAfterBothAsync(future02,()->{     //void run()
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

thenAcceptBothAsync 传入 void accept(T t, U u)

都完成了计算才会执行,获取结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        System.out.println("任务 02 结束:");
        return "Hello";
    }, service);
    
    future01.thenAcceptBothAsync(future02,(f1,f2)->{    //void accept(T t, U u)
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
        System.out.println("任务 03 结束:"+f1+"----->"+f2);
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

thenCombineAsync 传入 R apply(T t, U u)

都完成了计算才会执行,获取结果,返回当前任务的返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        System.out.println("任务 02 结束:");
        return "Hello";
    }, service);
    
    CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {    //R apply(T t, U u)
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
        System.out.println("任务 03 结束:" + f1 + "----->" + f2);
        return f1 + f2;
    }, service);
    String s = future.get();
    System.out.println("任务 03结果是:"+ s );
    System.out.println("main......end......");
}

谷粒商城(三)

6)、两任务组合 - 一个完成

只要有一个任务完成就执行第三个

谷粒商城(三)

  1. 当两个任务中,任意一个 future 任务完成的时候,执行任务
  2. applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
  3. acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
  4. runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

示例:AB只要有一个执行完成就执行C

runAfterEitherAsync 传入 void run()

任何一个完成了都会执行,不获取结果,没有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        try {
            Thread.sleep(5000);
            System.out.println("任务 02 结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello";
    }, service);
    
    future01.runAfterEitherAsync(future02,()->{     //void run()
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

acceptEitherAsync 传入 void accept(T t)

任何一个完成了都会执行,获取结果,没有返回值(注意两个线程需要返回同类型结果)

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        try {
            Thread.sleep(5000);
            System.out.println("任务 02 结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello";
    }, service);
    
    future01.acceptEitherAsync(future02,(result)->{     //void accept(T t)
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
        System.out.println("任务 03 结束:" + result);
    },service);
    System.out.println("main......end......");
}

谷粒商城(三)

applyToEitherAsync 传入 R apply(T t)

任何一个完成了都会执行,获取结果,返回当前任务的返回值(注意两个线程需要返回同类型结果)

public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start....");
    CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 01 线程" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("任务 01 结果:" + i);
        return i;
    }, service);
    CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
        System.out.println("任务 02 线程:" + Thread.currentThread().getId());
        try {
            Thread.sleep(5000);
            System.out.println("任务 02 结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello";
    }, service);
    
    CompletableFuture<String> future = future01.applyToEitherAsync(future02, (result) -> {     //R apply(T t)
        System.out.println("任务 03 线程:" + Thread.currentThread().getId());
        System.out.println("任务 03 结束:" + result);
        return result + "哔哩哔哩";
    }, service);
    String s = future.get();
    System.out.println("任务 03结果是:"+ s );
    System.out.println("main......end......");
}

谷粒商城(三)

7)、多任务组合

谷粒商城(三)

  1. allOf:等待所有任务完成
  2. anyOf:只要有一个任务完成

提供三个线程任务

CompletableFuture<String> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务 11 线程");
    return "任务 11";
}, service);
CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务 22 线程");
    return "任务 22";
}, service);
CompletableFuture<String> future03 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务 33 线程");
    return "任务 33";
}, service);

若是以下面的方式获取三个线程的执行结果,阻塞的时间是耗时最长的 get 所花费的时间

get() 是阻塞式等待,阻塞的是主线程,但是 get 方法本身是异步执行的,即当有多个线程的 get() 执行,阻塞的时间仅仅是耗时最长的 get 所花费的时间,而不是所有get花费时间之和

future01.get();
future02.get();
future03.get();

使用 allOf:等待所有任务完成

CompletableFuture<Void> allOf = CompletableFuture.allOf(future01, future02, future03);
allOf.get();//等待所有线程执行完

System.out.println("返回结果是:"+future01.get()+"=>"+future02.get()+"=>"+future03.get());  //可以保证所有线程都执行完了

谷粒商城(三)

这种情况可以保证所有线程都执行完了,如果有其他服务是需要这几个服务执行结束再执行…

anyOf:只要有一个任务完成

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future01, future02, future03);

System.out.println("返回结果是:"+anyOf.get());

谷粒商城(三)

RabbitMQ

1、安装

1)、docker安装RabbitMQ

docker run -d --name rabbitmq 
-p 5671:5671 -p 5672:5672 
-p 4369:4369 -p 25672:25672 
-p 15671:15671 -p 15672:15672 
rabbitmq:management

4369, 25672 (Erlang发现&集群端口)
5672, 5671 (AMQP端口)
15672 (web管理后台端口)
61613, 61614 (STOMP协议端口)
1883, 8883 (MQTT协议端口

# 设置开机自启
docker update rabbitmq restart=always

访问 http://localhost:15672 ,账号密码均为 guest

谷粒商城(三)

谷粒商城(三)

2)、Windows 安装

1 安装

由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下 erlang

erlang与RabbitMQ版本的对应关系,如:RabbitMQ3.6.10,建议的erlang版本是19.3.x(安装前必看)

Erlang下载地址(各版本都可下载)

此处我下载的是 24.2.2 ,RabbitMQ 是 3.8.35

谷粒商城(三)

安装时要以管理员身份运行,然后一直next即可

谷粒商城(三)

RabbitMQ下载地址

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.35/rabbitmq-server-3.8.35.exe

下载后双击即可安装。安装后打开**【任务管理器】中的【服务】项,即可看到有一个【RabbitMQ】**服务正在执行

2 启动

进入 rabbitmq 安装目录的 sbin ,打开 cmd ,执行 rabbitmq-plugins enable rabbitmq_management

然后打开浏览器,访问 http://localhost:15672 ,账号密码均为 guest

如果想在其他服务器使用,必须新增其他账号。因为guest账号只能在localhost情况下使用

谷粒商城(三)

2、概述

谷粒商城(三)

1)、消息中间件作用

① 异步处理

谷粒商城(三)

② 应用解耦

谷粒商城(三)

③ 流量控制

谷粒商城(三)

消息服务中两个重要概念:

消息代理(message broker)和 目的地(destination)

当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地

消息队列主要有两种形式的目的地

  1. 队列(queue):点对点 消息通信(point-to-point)
    1. 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列
    2. 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者
  2. 主题(topic):发布(publish)/订阅(subscribe)消息通信
    1. 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

2)、JMS、AMQP

JMS(Java Message Service)JAVA消息服务:
① 基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
② 实际上是指JMS API,JMS和JDBC担任差不多的角色,用户都是根据相应的接口可以和实现了JMS的服务进行通信,进行相关的操作
③ 所以缺点是限于Java语言

AMQP(Advanced Message Queuing Protocol)
① 高级消息队列协议,也是一个消息代理的规范,兼容JMS
② 这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
③ 从这一点看,AQMP可以用 http 来进行类比,不关心实现的语言
④ RabbitMQ是AMQP的实现

二者对比:

谷粒商城(三)

3)、 Spring支持

Spring 支持
① spring-jms 提供了对JMS的支持、 spring-rabbit 提供了对AMQP的支持
② 需要 ConnectionFactory 的实现来连接消息代理
③ 提供 JmsTemplate、RabbitTemplate 的实例来发送消息
④ 提供 @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息
⑤ 提供 @EnableJms、@EnableRabbit开启支持

Spring Boot 自动配置

• JmsAutoConfiguration
• RabbitAutoConfiguration

① 引入 spring-boot-starter-amqp 依赖,RabbitAutoConfiguration 就会自动生效
② 给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
③ 给配置文件中配置 spring.rabbitmq 信息

所有的默认属性都是 RabbitProperties 里面配置指定的
@ConfigurationProperties(prefix = “spring.rabbitmq”)
public class RabbitProperties

@EnableRabbit 开启功能

4)、RabbitMQ概念

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现

核心概念:

Message

消息,消息是不具名的,它由消息头和消息体组成

消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)priority(相对于其他消息的优先权)delivery-mode(指出该消息可能需要持久性存储)

Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序

Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别

direct、headers 点对点
fanout、topic 订阅

Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点

一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走

发消息是发给交换机,监听消息是监听队列

Binding

绑定,用于消息队列和交换器之间的关联

一个绑定就是 基于路由键 将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表

Exchange 和Queue的绑定可以是多对多的关系

Connection

网络连接,比如一个TCP连接

一条高速公路开通八个车道,就是一条长连接connection中可开辟多个channel通道

Channel

信道,多路复用连接中的一条独立的双向数据流通道

信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成

因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象

虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制

vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

相当于有多个 rabbit 一般,相互隔离

Broker

表示消息队列服务器实体

5)、RabbitMQ运行机制

AMQP 中的消息路

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 ExchangeBinding 的角色

生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到哪个队列

谷粒商城(三)

6)、Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers

headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型

1 direct

谷粒商城(三)

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配

如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发 “dog.puppy”,也不会转发 “dog.guard” 等等

它是完全匹配、单播的模式

2 fanout

谷粒商城(三)

每个发到 fanout 类型交换器的消息都会 分到所有绑定的队列 上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上

很像子网广播,每台子网内的主机都获得了一份复制的消息

fanout 类型 转发消息是最快

3 topic

topic是绑定的时候指定模糊的路由,发送消息是给出指定路由,会去匹配多个队列的模糊路由

谷粒商城(三)

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用 点 . 隔开

它同样也会识别两个通配符:符号 “ # ” 和符号 “ * ” 。# 匹配0个或多个单词,* 匹配一个单词

谷粒商城(三)

3、SpringBoot整合RabbitMQ

1)、 SpringBoot整合配置

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /gulimall-order

启动类

/**
 * 1 引入 spring-boot-starter-amqp,RabbitAutoConfiguration就会生效
 * 2 给容器中自动配置了 RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate、RabbitTemplateConfiguration
 * 3 给配置文件中配置 spring.rabbitmq 信息
 *      自动配置的属性来自 RabbitProperties
 *      @ConfigurationProperties(prefix = "spring.rabbitmq")
 *      public class RabbitProperties
 * 4 @EnableRabbit 开启功能
 */
@SpringBootApplication
@EnableRabbit
public class GulimallOrderApplication

谷粒商城(三)

配置 MyRabbitConfig,可以使得传输的对象消息转换为 json 传输

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

注意这里有个坑!必须给被传输的消息对象 content 提供一个无参构造器!否则会报错!

2)、AmqpAdmin创建

1、使用AmqpAdmin进行创建 Exchange[hello-java-exchange]、Queue、Binding
2、使用 RabbitTemplate 发送消息

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {
 
    @Autowired
    AmqpAdmin amqpAdmin;
 
    @Test
    void contextLoads() {
    	/**
		 * DirectExchange构造器:
		 * 	String name, 交换机的名字
		 * 	boolean durable, 是否持久
		 * 	boolean autoDelete, 是否自动删除
		 * 	Map<String, Object> arguments 参数
		 */
        DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[{}]创建成功","hello-java-exchange");
    }
 
    @Test
    public void createQueue(){
        Queue queue = new Queue("hello-java-queue",true,false,false);
        amqpAdmin.declareQueue(queue);
        log.info("Queue[{}]创建成功","hello-java-queue");
    }
 
    @Test
    public void createBinding(){
    	/**
		 * Binding构造器:
		 * 	String destination, 目的地
		 * 	DestinationType destinationType, 目的地类型(交换机或者队列)
		 * 	String exchange, 交换机的名字
		 * 	String routingKey, 路由键
		 *  Map<String, Object> arguments 参数
		 *  
		 * 将exchange指定的交换机和destination目的地进行绑定,
		 * 使用routingkey作为指定的路由键
		 */
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);
        amqpAdmin.declareBinding(binding);
        log.info("Binding[{}]创建成功","hello-java-binding");
    }
}

浏览器查看创建结果

谷粒商城(三)

3)、RabbitTemplate发送消息

@Autowired
RabbitTemplate rabbitTemplate;

/**
 *  1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
 *  2、配置MyRabbitConfig,让发送的对象类型的消息,可以是一个json
 */
@Test
public void sendMessageTest(){
    OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
    orderReturnApplyEntity.setId(1L);
    orderReturnApplyEntity.setCreateTime(new Date());
    orderReturnApplyEntity.setReturnName("哈哈哈");
    /**
     * 参数分别为:指定exchange、路由键、消息内容
     */
    rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",orderReturnApplyEntity);
    log.info("消息发送完成{}",orderReturnApplyEntity);
}

因为配置了 MyRabbitConfig,所以对象是以 json 格式传输

默认是Java序列化

谷粒商城(三)

注意这里有个坑!必须给消息对象content提供一个无参构造器!否则会报错!

新建接口发送消息 RabbitController

@RestController
public class RabbitController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendMq(@RequestParam(value = "num",defaultValue = "10") Integer num){
        for (int i = 0; i < num; i++) {
            if (i%2 == 0){
                OrderReturnApplyEntity entity = new OrderReturnApplyEntity();
                entity.setId(1L);
                entity.setCreateTime(new Date());
                entity.setReturnName("哈哈哈");
                /**
                 * exchange, routingKey, 消息内容, (CorrelationData)发送消息的唯一标识
                 *   对象必须实现Serializable,且必须有无参构造器
                 */
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity,new CorrelationData(UUID.randomUUID().toString()));
            }else {
                OrderEntity entity = new OrderEntity();
                entity.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",entity,new CorrelationData(UUID.randomUUID().toString()));
            }
        }
        return "ok";
    }
}

4)、RabbitListener&RabbitHandler 监听消息

监听消息的方法所在类必须要在 Spring 容器中(此处必须有@Service注解)

监听消息的方法可以有三种参数(不分数量,顺序)
Object content, Message message, Channel channel

Message打印如下
谷粒商城(三)

@RabbitListener 可用在类上、或者方法上

  1. 单独使用在方法上
    若消息内容有多类型,收消息的参数order只有一种类型,则会将相同属性进行赋值进参数order
    谷粒商城(三)

  2. 使用在类上,方法上需要配合使用 @RabbitHandler
    可以多个方法使用不同类型的方法参数接收不同类型的消息实体类
    谷粒商城(三)

@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    /**
     *  当监听的队列里有消息时,此处会自动消费,无需手动执行方法
     *
     *  消费方法的参数可以写以下内容
     *      1 Message message:原生消息详细信息。头+体
     *      2 T<发送的消息类型> :OrderReturnApplyEntity content
     *      3 Channel channel :当前传输数据的通道
     *
     *  @RabbitListener(queues = {"hello-java-queue"})
     *      1 可单独使用在方法上
     *          若消息内容有多类型,收消息的参数order只有一种类型,则会将相同属性进行赋值进参数order
     *      2 可使用在类上,方法上配合使用 @RabbitHandler
     *          可以多个方法使用不同类型的方法参数接收不同类型的消息实体类
     *
     * Queue:可以很多人都来监听,但只会有一个消费者消费一条消息。只要消费收到消息,队列就删除消息,而且只能有一个消费收到此消息
     *      1 订单服务启动多个;同一个消息,只能有一个客户端收到消费
     *      2 只有一个消息完全处理完,即下面的方法运行结束,我们才可以接收到下一个消息
     *
     * @param message 原生消息详细信息。头+体
     * @param content 发送的消息类型
     * @param channel 当前传输数据的通道
     */
    // @RabbitListener(queues = {"hello-java-queue"})  //queues:声明需要监听的所有队列
    @RabbitHandler
    public void receiverMessage(Message message,
                                OrderReturnApplyEntity content,
                                Channel channel){
        System.out.println("第一内容:" + content);
    }

    @RabbitHandler
    public void receiverMessage2(Message message,
                                 OrderEntity content,
                                Channel channel){
        System.out.println("第二内容:" + content);
    }
}

4、RabbitMQ消息确认机制

谷粒商城(三)

1)、ConfirmCallback

需要配置 spring.rabbitmq.publisher-confirms=true

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /gulimall-order
    #开启消息抵达服务器确认
    publisher-confirms: true

然后定制 RabbitTemplate,设置确认回调

修改配置类 MyRabbitConfig

@Autowired
RabbitTemplate rabbitTemplate;

/**
 * 定制RabbitTemplate
 */
@PostConstruct //MyRabbitConfig对象创建之后,执行此方法
public void initRabbitTemplate(){

    //设置消息成功抵达broker确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * 只要消息抵达Broker,ack就为true,不管消息有没有抵达队列、有没有被消费,不管消息发送的路由键是否正确
         * 但是若交换机名错误,ack为false
         * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
         * @param ack 消息是否成功收到,true表示消息成功抵达broker
         * @param cause 失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm...correlationData["+correlationData+"]");
            System.out.println("ack["+ack+"]");
            System.out.println("cause==>["+cause+"]");
        }
    });
}

只要消息抵达 Broker,ack 就为 true;
若交换机名错误,消息无法抵达,ack 为 false,报错如下:

谷粒商城(三)

消息只要被 broker 接收到就会执行 confirmCallback。如果是集群模式,需要所有的 broker 接收到消息才会调用 confirmCallback

被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback

2)、ReturnCallback

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 ReturnCallback

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据

需要配置
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /gulimall-order
    #开启消息抵达服务器确认
    publisher-confirms: true
    #开启发送端抵达队列确认
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个returnConfirm
    template:
      mandatory: true

然后定制 RabbitTemplate,设置确认回调ReturnCallback

修改配置类 MyRabbitConfig

@Autowired
RabbitTemplate rabbitTemplate;

/**
 * 定制RabbitTemplate
 */
@PostConstruct //MyRabbitConfig对象创建之后,执行此方法
public void initRabbitTemplate(){

    //设置消息成功抵达broker确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * 只要消息抵达Broker,ack就为true,不管消息有没有抵达队列、有没有被消费,不管消息发送的路由键是否正确
         * 但是若交换机名错误,ack为false
         * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
         * @param ack 消息是否成功收到,true表示消息成功抵达broker
         * @param cause 失败的原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm...b["+ack+"]s==>["+cause+"]");
        }
    });

    //设置消息抵达队列的确认回调
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        /**
         *  消息没有传递给指定的队列,就触发这个失败回调。若交换机名错误,执行ConfirmCallback或者直接报错
         *  若消息发送的路由键不正确,会触发此回调
         * @param message 投递失败的消息
         * @param replyCode 回复的状态码
         * @param replyText 回复的文本内容
         * @param exchange 当时这个消息是要发给哪个交换机
         * @param routingKey 当时这个消息是用哪个路由键
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("Fail Message["+message+"]");
            System.out.println("replyCode["+replyCode+"]");
            System.out.println("replyText["+replyText+"]");
            System.out.println("exchange["+exchange+"]");
            System.out.println("routingKey["+routingKey+"]");
        }
    });
}

若消息发送的路由键不正确,会触发此回调,打印如下

谷粒商城(三)

3)、ack

参考文档

消费端确认(保证每个消息被正确消费,此时才可以保证broker删除这个消息)

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)

  • 当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者 显式地回复确认 信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)
  • RabbitMQ 不会为未确认的消息设置过期时间
  • Web 管理平台上,当前队列中的 “Ready” 状态表示 等待投递 给消费者的消息数; “Unacknowledged” 状态表示 已经投递给消费者但是 未收到确认信号 的消息数

谷粒商城(三)

消费者收到消息后,手动调用 Basic.Ack 或 Basic.Nack 或 Basic.Reject 后,RabbitMQ 收到这些消息后,才认为本次投递完成

  • basicAck 命令:用于确认当前消息;broker将移除此消息
  • basicNack 命令:用于否定当前消息;可以指定broker是否丢弃此消息,可以批量
  • basicReject 命令:用于拒绝当前消息;同上,但不能批量

默认自动 ack,消息被消费者收到,就会从 broker 的 queue 中移除。若 queue无消费者,消息依然会被存储,直到消费者消费

自动ack可能导致消息丢失,可以开启手动ack模式。
若消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

修改配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /gulimall-order
    #开启消息抵达服务器确认
    publisher-confirms: true
    #开启发送端抵达队列确认
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个returnConfirm
    template:
      mandatory: true
    #手动确认收货(ack)
    listener:
      simple:
        acknowledge-mode: manual

修改消息接收类 OrderItemServiceImpl

@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @RabbitHandler
    public void receiverMessage(Message message,
                                OrderReturnApplyEntity content,
                                Channel channel) throws IOException {
        //deliveryTag:消息的运输标签,在 channel 内是递增的,即123456
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println(deliveryTag);  //打印13579
        /**
         *  (long deliveryTag, boolean multiple, boolean requeue)
         *  参数1:指定要接收的消息运输标签
         *  参数2:是否批量处理,为true确认的是小于等于tag之前的所有消息
         *  参数3:若为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
         *         为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者
         */
        channel.basicAck(deliveryTag,false);
        System.out.println("第一内容:" + message.getMessageProperties().getConsumerQueue());
    }

    @RabbitHandler
    public void receiverMessage2(Message message,
                                 OrderEntity content,
                                Channel channel){
        System.out.println("第二内容:" + message.getMessageProperties().getConsumerQueue());
    }
}

上面的 deliveryTag 打印输出为 13579

一共发送 10 条消息,第二内容中的5条没有手动确认,客户端可以看到消息未被确认
谷粒商城(三)

basicReject 方法用于明确拒绝当前的消息而不是确认
谷粒商城(三)

5、RabbitMQ延时队列(实现定时任务)

1)、原理

使用场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品

常用解决方案:
使用 spring的 schedule 定时任务轮询数据库

缺点:
消耗系统内存、增加了数据库的压力(需要全面扫描数据库找出超时订单)、存在较大的时间误差(时效性问题如下图)

谷粒商城(三)

解决:rabbitmq的 消息或队列TTL死信 Exchange 结合

  1. RabbitMQ可以对队列和消息分别设置TTL:
    1. 对队列设置就是 队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
    2. 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)
    3. 这里单讲单个消息的 TTL,因为它才是实现延迟任务的关键。可以通过设置消息的 expiration 字段或者 x-message-ttl 属性来设置时间,两者是一样的效果
  2. Dead Letter Exchanges(DLX)死信交换机
    1. 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。(什么是死信)以下三种消息都称为死信消息
      1. 一个消息被Consumer 拒收 了,并且reject方法的参数里requeue是false。也就是说拒收且不会被再次放在队列 里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
      2. 上面的消息的TTL到了,消息过期了。
      3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
    2. Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他 exchange没有两样。只是在某一个设置 Dead Letter Exchange 的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去
    3. 我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机
      结合二者,其实就可以实现一个延时队列
  3. 手动 ack & 异常消息统一放在一个队列处理建议的两种方式
    1. catch 异常后,手动发送到指定队列,然后使用 channel 给 rabbitmq 确认消息已消费
    2. 给 Queue 绑定死信队列,使用 nack(requque为false)确认消息消费失败

如图,一个消息被消费者拒绝,变成死信:

谷粒商城(三)

因为simple.queue绑定了死信交换机dl.direct,因此死信会投递到这个死信交换机中:

谷粒商城(三)

如果这个死信交换机也绑定了队列,则死信最终会存放在死信队列中:

谷粒商城(三)

另外,队列将死信传递给死信交换机时必须知道两个信息:
死信交换机的名称
死信交换机与死信队列绑定的RoutingKey
这样才能保证投递的消息可以到达死信交换机,并且正确的路由到死信队列:

谷粒商城(三)

给队列设置过期时间的方式:(使用这种)

MQ是 惰性检测机制,懒检查,若队列中消息过期时间不一致,第一个消息过期时间最长,MQ会等第一条消息过期之后才会拿第二个信息,先进先出,所以会导致消息过期时间设置无效。所以选择给队列设置过期时间

谷粒商城(三)

给消息设置过期时间的方式:

谷粒商城(三)

使用两个交换机,如下

谷粒商城(三)

2)、代码实现

实现下面这种,使用一个交换机

谷粒商城(三)

发送出的消息三十分钟之后才可以到达监听者

原始队列不让有任何监听者,当其中的消息过期后,自动转发到死信队列,监听者都监听死信队列

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /gulimall-order
    #开启消息抵达服务器确认
    publisher-confirms: true
    #开启发送端抵达队列确认
    publisher-returns: true
    #只要抵达队列,以异步发送优先回调我们这个returnConfirm
    template:
      mandatory: true
    #手动确认收货(ack)
    listener:
      simple:
        acknowledge-mode: manual

新建配置类,注入交换机、队列、绑定关系

使用 @Bean 可以直接注入,不需要AmqpAdmin
但是如果创建之后有改动,只要MQ已经存在,就不会覆盖

@Configuration
public class MyMQConfig {
    /**
     * 直接注入,只要发送消息,就会在RabbitMQ创建,不需要AmqpAdmin
     * 如果创建之后有改动,只要MQ已经存在,就不会覆盖
     */
    @Bean
    public Exchange orderEventExchange(){
        /**
         * 	String name, 交换机的名字
         * 	boolean durable, 是否持久
         * 	boolean autoDelete, 是否自动删除
         * 	Map<String, Object> arguments 参数
         */
        return new TopicExchange("order-event-exchange",true,false);
    }

    @Bean
    public Queue orderDelayQueue(){
        /**
         * 	String name, 队列的名字
         * 	boolean durable, 是否持久
         * 	boolean exclusive, 是否排他(只能有一个人连)
         * 	boolean autoDelete, 是否自动删除
         * 	Map<String, Object> arguments 参数
         * 
         * 此队列是可能包含死信的队列,普通队列
         * 队列中的死信就会投递到交换机中,而这个交换机也被成为死信交换机(Dead Letter Exchange,简称DLX)
         */
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","order-event-exchange"); //指定死信交换机
        arguments.put("x-dead-letter-routing-key","order.release.order");
        arguments.put("x-message-ttl", 60000);
        return new Queue("order.delay.queue",true,false,false,arguments);
    }

    @Bean
    public Queue orderReleaseOrderQueue(){
        return new Queue("order.release.order.queue",true,false,false);
    }

    @Bean
    public Binding orderCreateOrder(){
        /**
         * 	String destination, 目的地
         * 	DestinationType destinationType, 目的地类型(交换机或者队列)
         * 	String exchange, 交换机的名字
         * 	String routingKey, 路由键
         *  Map<String, Object> arguments 参数
         *
         * 将exchange指定的交换机和destination目的地进行绑定,
         * 使用routingkey作为指定的路由键
         */
        return new Binding("order.delay.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.create.order",null);
    }

    @Bean
    public Binding orderReleaseOrder(){
        return new Binding("order.release.order.queue",
                Binding.DestinationType.QUEUE,
                "order-event-exchange",
                "order.release.order",null);
    }
}

谷粒商城(三)

发送消息

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMq")
    public String sendMq(){
        OrderEntity entity = new OrderEntity();
        entity.setId(1L);
        entity.setCreateTime(new Date());
        /**
         * exchange, routingKey, 消息内容, (CorrelationData)发送消息的唯一标识
         *   对象必须实现Serializable,且必须有无参构造器
         */
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity, new CorrelationData(UUID.randomUUID().toString()));
        return "ok";
    }

接收消息

    @RabbitListener(queues = "order.release.order.queue")
    public void recieve(OrderEntity entity, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println(entity);
        //因为消息要重新入队,所以注意此处是 false
        channel.basicAck(deliveryTag,false);
    }

时隔一分钟之后接收到消息内容

谷粒商城(三)

6、消息可靠性

1)、消息丢失

2)、消息重复

3)、消息积压

接口幂等性

1、概念

接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用

例如以下的几种情形时,需要幂等性:

  • 用户多次点击按钮
  • 用户页面回退再次提交
  • 微服务互相调用,由于网络问题,导致请求失败。feign 触发重试机制

2、幂等解决方案

1)、token 机制

实现步骤:

  1. 服务端提供了发送 token 的接口。必须在执行业务前,先去获取 token,服务器会把 token 保存到 redis 中
  2. 然后调用业务接口请求时,把 token 携带过去,一般放在请求头部
  3. 服务器判断 token 是否存在 redis 中,存在表示第一次请求,然后删除 token,继续执行业务
  4. 如果判断 token 不存在 redis 中,就表示是重复操作,直接返回重复标记给 client

其中有几个问题:

  1. 先删除 token 还是后删除 token;
    1. 先删除可能导致,业务确实没有执行,重试还带上之前 token,由于防重设计,就导致请求还是不能执行
    2. 后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别人继续重试,导致业务被执行两遍
    3. 我们最好设计为先删除 token,如果业务调用失败,就重新获取 token 再次请求。
  2. Token 获取、比较和删除必须是原子性
    1. redis.get(token) 、token.equals、redis.del(token) 如果这两个操作不是原子,可能导致,高并发下,都 get 到同样的数据,判断都成功,继续业务并发执行
    2. 可以在 redis 使用 lua 脚本
    3. if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end 若删除成功返回 1

2)、各种锁机制

1 数据库悲观锁
select * from xxxx where id = 1 for update

一般结合事务一起使用

注意:id 字段一定是主键或者唯一索引,不然可能造成锁表的结果

2 数据库乐观锁
update t_goods 
set count = count -1,version = version + 1 
where good_id=2 and version = 1

一般用于更新场景,主要使用于处理读多写少的问题

3 业务层分布式锁

高并发场景需要分布式锁

3)、各种唯一约束

1 数据库唯一约束

插入数据,应该按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入,解决了在 insert 场景时幂等问题

但主键的要求不是自增的主键,这样就需要业务生成全局唯一的主键。

如果是分库分表场景下,路由规则要保证相同请求下,落地在同一个数据库和同一表中,要不然数据库主键约束就不起效果了,因为是不同的数据库和表主键不相关。

2 redis set 防重

很多数据需要处理,只能被处理一次,比如我们可以计算数据的 MD5 将其放入 redis 的 set

每次处理数据,先看这个 MD5 是否已经存在,存在就不处理

3 防重表

使用订单号 orderNo 做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且他们在同一个事务中

这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题

这里要注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。

4 全局请求唯一 id

调用接口时,生成一个唯一 id,redis 将数据保存到集合中(去重),存在即处理过

可以使用 nginx 设置每一个请求的唯一 id

proxy_set_header X-Request-Id $request_id

不适用于此处

分布式事务

1、本地事务

1)、事务的基本性质

数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)、持久性(Durabilily),简称就是 ACID;

  • 原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败
  • 一致性:数据在事务的前后,业务整体一致。
    • 转账。A:1000;B:1000; 转 200 事务成功; A:800 B:1200
  • 隔离性:事务之间互相隔离。
  • 持久性:一旦事务成功,数据一定会落盘在数据库。

在以往的单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常,我们可以很容易的整体回滚;

谷粒商城(三)

Business:我们具体的业务代码
Storage:库存业务代码;扣库存
Order:订单业务代码;保存订单
Account:账号业务代码;减账户余额

比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败

一个事务开始,代表以下的所有操作都在同一个连接里面;

2)、事务的隔离级别

  • READ UNCOMMITTED(读未提交)
    该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读

  • READ COMMITTED(读提交)
    一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重
    复读问题,Oracle 和 SQL Server 的默认隔离级别。

  • REPEATABLE READ(可重复读)
    该隔离级别是 MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间
    点的状态,因此,同样的 select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL
    的 InnoDB 引擎可以通过 next-key locks 机制(参考下文"行锁的算法"一节)来避免幻读。

  • SERIALIZABLE(序列化)
    在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的 InnoDB 引擎会给读操作隐式
    加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。

3)、事务的传播行为

  1. PROPAGATION_REQUIRED:如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,该设置是最常用的设置。
  2. PROPAGATION_SUPPORTS:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。
  3. PROPAGATION_MANDATORY:支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。
  4. PROPAGATION_REQUIRES_NEW:创建新事务,无论当前存不存在事务,都创建新事务。
  5. PROPAGATION_NOT_SUPPORTED:以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
  6. PROPAGATION_NEVER:以非事务方式执行,如果当前存在事务,则抛出异常。
  7. PROPAGATION_NESTED:如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 PROPAGATION_REQUIRED 类似的操作。

4)、SpringBoot 事务的坑

同一个类下的方法互相调用,即使设置了不同的事务注解,依然以第一个执行的方法(当前方法)的事务为准

本地事务失效问题:
同一个对象内事务方法互调默认失效,原因是绕过了springboot的代理对象
事务使用代理对象来控制的

解决: 使用代理对象来调用事务方法
1、 导入 spring-boot-starter-aop(其中引入了 aspectJ)
2、使用注解 @EnableAspectJAutoProxy(exposeProxy=true) 开启aspectJ代理功能,以后所有的动态代理都是aspectJ创建的,即使没有接口也可以创建动态代理
exposeProxy=true 对外暴露代理对象
3、本类互相调用方法,使用 AopContext.currentProxy() 可以强转为本类对象,然后调用方法

示例代码:

@Transactional(timeout = 10)
void a() {
    /*
    这样直接调用本类方法,会使b,c事务设置无效,会默认共用a事务设置
    b();
    c();
    */
    GulimallOrderApplicationTests test = (GulimallOrderApplicationTests) AopContext.currentProxy();
    test.b();
    test.c();
}
@Transactional(timeout = 2)
void b() {}
@Transactional(timeout = 3)
void c() { }

2、分布式事务

1)、CAP 定理与 BASE 理论

1 CAP 定理

CAP 原则又称 CAP 定理,指的是在一个分布式系统中

  • 一致性(Consistency):
    • 在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访
      问同一份最新的数据副本)
  • 可用性(Availability)
    • 在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据
      更新具备高可用性)
  • 分区容错性(Partition tolerance)
    • 大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。
    • 分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。

C:所有数据请求者同一时刻看到的数据必须是相同的,无论他们连接的是哪个节点
A:接收到的任何请求,都会在合理时间内得到节点的有效响应,而不是错误和超时的响应
P:遇到某个节点或网络分区故障的时候,集群下仍然有节点能够持续提供服务(服务:服务只能在一致性和可用性之间取舍)

CAP 原则指的是,这三个要素最多 只能同时实现两点,不可能三者兼顾

谷粒商城(三)

一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是成立。CAP 定理告诉我们,剩下的 C 和 A 无法同时做到。

分布式系统中实现一致性的有 raft 算法、paxos 算法

此为动态图展示 raft 算法的实现原理
http://thesecretlivesofdata.com/raft/

对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证 P 和 A,舍弃 C

2 BASE 理论

是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。

BASE 是指

  • 基本可用(Basically Available)
    • 基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。
    • 响应时间上的损失:正常情况下搜索引擎需要在 0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了 1~2 秒。
    • 功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。
  • 软状态( Soft State)
    • 软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。
  • 最终一致性( Eventual Consistency)
    • 最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。

2)、强一致性、弱一致性、最终一致性

从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性:

  1. 对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是 强一致性
  2. 如果能容忍后续的部分或者全部访问不到,则是 弱一致性
  3. 如果经过一段时间后要求能访问到更新后的数据,则是 最终一致性

3、分布式事务几种方案

1)、2PC 模式

数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。

MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。

其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交
第二阶段:事务协调器要求每个数据库提交数据。其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。

谷粒商城(三)

  • XA 协议比较简单,而且一旦商业数据库实现了 XA 协议,使用分布式事务的成本也比较低。
  • XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景
  • XA 目前在商业数据库支持的比较理想,在 mysql 数据库中支持的不太理想,mysql 的 XA 实现,没有记录 prepare 阶段日志,主备切换回导致主库与备库数据不一致。
  • 许多 nosql 也没有支持 XA,这让 XA 的应用场景变得非常狭隘。
  • 也有 3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)

2)、柔性事务 - TCC 事务补偿型方案

刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;

与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但 要求最终一致

谷粒商城(三)

一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理

谷粒商城(三)

3)、柔性事务 - 最大努力通知型方案

按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种
方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种
方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通
知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对
账文件),支付宝的支付成功异步回调

4)、柔性事务 - 可靠消息+最终一致性方案(异步确保型)

实现:

业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务 只记录消息数据,而不是真正的发送

业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。

防止消息丢失:

  • 1、做好消息确认机制(pulisher,consumer【手动 ack】)
  • 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发一遍

准备数据库表存储消息发送情况

CREATE TABLE `mq_message` (
	`message_id` char(32) NOT NULL,
	`content` text,
	`to_exchane` varchar(255) DEFAULT NULL,
	`routing_key` varchar(255) DEFAULT NULL,
	`class_type` varchar(255) DEFAULT NULL,
	`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
	`create_time` datetime DEFAULT NULL,
	`update_time` datetime DEFAULT NULL, PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

4、seata 使用

1)、概念

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

此处使用的是旧版的 seata ,即 使用的是自带的配置文件 file.conf、 registry.conf

谷粒商城(三)

  • TC (Transaction Coordinator) - 事务协调者
    维护全局和分支事务的状态,驱动全局事务提交或回滚。

  • TM (Transaction Manager) - 事务管理器
    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - 资源管理器
    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

2)、使用

Seata控制分布式事务

  1. 每一个微服务先必须创建 undo_logo 回滚日志表
  2. 安装事务协调器 seata-server 0.7.1
    https://github.com/seata/seata/releases
  3. 导入依赖 spring-cloud-starter-alibaba-seata 2.1.0.RELEASE,会发现同时也导入了 seata-all 0.7.1
  4. 解压并启动 seata-server 0.7.1
    1. 在 conf 文件夹内,修改配置文件
    2. registry.conf 注册中心相关的配置,修改 registry type=nacos,指定的使用nacos
    3. file.conf 配置,因为指定 file 为配置中心
    4. 双击 bin 目录中的 seata-server.bat 启动服务,启动成功后可以在 nacos 中看到 seata 的服务注册
  5. 编写配置类,使用 seata DatasourceProxy代理自己的数据源
  6. 每个微服务,都必须导入 registry.cof、file.conf 两个文件(从上面复制)
    修改 file.conf 的 service.vgroup_mapping 配置
    file.conf vgroup_mapping.{application.name}-fescar-service-group = "default"
  7. 给分布式大事务的路口标注@GlobalTransactional
  8. 每一个远程的小事务用 @Transactional

每个微服务数据库加上undo_log(回滚日志表)
(使用到的服务都要自己添加数据库表)

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

修改 seata-server 的配置文件

谷粒商城(三)

双击 bin 目录中的 seata-server.bat 启动服务,启动成功后可以在 nacos 中看到 seata 的服务注册

谷粒商城(三)

新增依赖(使用到的服务都要自己添加)

自动跟随 boot 版本,即 2.1.0.RELEASE,然后会发现同时也导入了 seata-all 0.7.1

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

仓库服务、订单服务都需要增加 数据源配置
(使用到的服务都要自己添加配置)

@Configuration
public class MySeataConfig {
    @Autowired
    DataSourceProperties dataSourceProperties;
 
    @Bean
    public DataSource dataSource(DataSourceProperties dataSourceProperties){
        //得到数据源
        HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
        if (StringUtils.hasText(dataSourceProperties.getName())){
            dataSource.setPoolName(dataSourceProperties.getName());
        }
        return new DataSourceProxy(dataSource);
    }
}

仓库服务、订单服务都需要添加 registry.cof、file.conf 两个文件
(使用到的服务都要自己添加指定的配置文件 )

谷粒商城(三)
谷粒商城(三)

给分布式大事务的路口标注 @GlobalTransactional,每一个远程的小事务用 @Transactional

定时任务

1、语法

此处使用 schedule 定时任务,文档地址

语法:秒 分 时 日 月 周 年(Spring 不支持)

谷粒商城(三)

特殊字符说明:

  • ,:枚举;
    (cron=“7,9,23 * * * * ?”):任意时刻的 7,9,23 秒启动这个任务;
  • -:范围:
    (cron=“7-20 * * * * ?”):任意时刻的 7-20 秒之间,每秒启动一次
  • *:任意;指定位置的任意时刻都可以
  • /:步长;
    (cron=“7/5 * * * * ?”):第 7 秒启动,每 5 秒一次;
    (cron=“*/5 * * * * ?”):任意秒启动,每 5 秒一次;
  • ?:(出现在日和周几的位置):为了防止日和周冲突,在周和日上如果要写通配符使用?
    (cron=“* * * 1 * ?”):每月的 1 号,启动这个任务;
  • L:(出现在日和周的位置)”,last:最后一个
    (cron=“* * * ? * 3L”):每月的最后一个周二
  • W:Work Day:工作日
    (cron=“* * * W * ?”):每个月的工作日触发
    (cron=“* * * LW * ?”):每个月的最后一个工作日触发
  • #:第几个
    (cron=“* * * ? * 5#2”):每个月的第 2 个周 4

2、整合 springBoot

在类上使用注解 @EnableScheduling
方法上使用 @Scheduled 开启一个定时任务文章来源地址https://www.toymoban.com/news/detail-497480.html

@Slf4j
@Component
@EnableAsync
@EnableScheduling
public class HelloSchedule {
 
    /**
     * 1、Spring中6位组成,不允许7位d的年
     * 2、周的位置,1-7代表周一到周日
     * 3、定时任务不应该阻塞(某个定时任务超时,也不应影响其他定时任务执行)。默认是阻塞的,解决方法:
     *      1)、可以让业务运行以异步的方式,自己提交到线程池
     *      2)、支持定时任务线程池:设置 TaskSchedulingProperties 如下
     *              【有的版本不太好使】
     * 				spring.task.scheduling.pool.size=5
     *      3)、让定时任务异步执行,开启异步任务
     *          1、@EnableAsync 开启异步任务功能
     * 			2、 @Async 给希望异步执行的方法上标注
     * 			3、自动配置类 TaskExecutionAutoConfiguration 属性绑定在 TaskExecutionProperties
     * 				spring.task.execution.pool.core-size=8
     *
     *      解决:使用 异步+定时任务 来完成定时任务不阻塞的功能
     */
    @Async
    @Scheduled(cron = "*/5 * * * * ?")
    public void hello() throws InterruptedException {
        log.info("hello......");
        /**
	     * 第一种:异步执行业务方法
	     * 	CompletableFuture.runAsyc(()->{
	     * 		xxService.hello();
	     * 	},executor);
	     */
    }
}

到了这里,关于谷粒商城(三)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • ElasticSearch 实战:ElasticSearch文档全文检索

    Elasticsearch 实战:Elasticsearch 文档全文检索 全文检索是 Elasticsearch 的核心功能之一,它允许用户对文本内容进行高效的模糊搜索、词组匹配、同义词处理、停用词过滤等操作。以下是如何进行文档全文检索的详细步骤: **1. **全文匹配查询(Match Query) 最基础的全文检索查询是

    2024年04月11日
    浏览(52)
  • ES(Elasticsearch 全文检索)

    数据量大的时候 索引失效 =查询性能低 功能比较弱 对文档的内容进行分词,对词条创建索引,记录词条所在的文档信息根据词条查询到文档的id 从而查到文档 文档:每一条数据就是一条文档 词条:文档按照语义分成的词语 正向索引 根据文档的id创建索引 查询词条必须先找

    2024年02月05日
    浏览(59)
  • ElasticSearch全文检索原理及过程

            ElasticSearch的搜索引擎中,每个 文档都有一个对应的文档 ID ,文档内容被表示为一系列的集合。例如文档 1 经过分词,提取了 20 个, 每个都会记录它在文档中出现的次数和出现位置 。那么,倒排索引就是 到文档   ID 的映射 ,每个关键

    2023年04月17日
    浏览(47)
  • 全文检索-Elasticsearch-整合SpringBoot

    前面记录了 Elasticsearch 全文检索的入门篇和进阶检索。这次我们来讲下 Spring Boot 中如何整合 ES,以及如何在 Spring Cloud 微服务项目中使用 ES 来实现全文检索,来达到商品检索的功能。 检索服务单独作为一个服务,就称作 gulimall-search 模块。 点击 Next 勾选 Spring Web 依赖,点击

    2024年02月08日
    浏览(50)
  • 全文检索学习之ElasticSearch学习笔记

    在非关系型数据库中,数据是非结构化的,如果直接去查找效率极低,全文检索将非结构化数据中的一部分信息提取出来,重新组织,使其变得有一定结构,然后对此有一定结构的数据进行搜索,从而达到搜索相对较快的目的。索引就是从非结构化数据中提取出的然后重新组

    2023年04月11日
    浏览(54)
  • 7-Elasticsearch组合查询和全文检索

    Elasticsearch组合查询 组合查询–布尔查询 组合查询中的常用的查询方式:布尔查询。 它将多个查询条件组合在一起,并且将查询的结果和结果的评分组合在一起。 布尔查询是把多个子查询组合成一个布尔表达式,所有子查询之间逻辑关系是and,只有当一个文档满足布尔查询

    2024年02月04日
    浏览(46)
  • ElasticSearch:全文检索及倒排索引原理

    首先介绍一下结构化与非结构化数据: 结构化数据将数据具有的特征事先以结构化的形式定义好,数据有固定的格式或有限的长度。典型的结构化数据就是传统关系型数据库的表结构,数据特征直接体现在表结构的字段上,所以根据某一特征做数据检索很直接,速度也比较快

    2024年02月14日
    浏览(44)
  • 九.全文检索ElasticSearch经典入门-ElasticSearch映射修改

    这篇文章的内容是ElasticSearch映射修改,写这篇文章是有水友公司里面遇到了映射修改问题,我这里做了一个整理,希望对你有所帮助。 在ElasticSearch中一旦创建了映射想要进行修改是不被允许的。比如我这里有一个案例 上面创建了索引employee ,同时为其创建映射,指定了id和

    2024年02月05日
    浏览(60)
  • 全文检索工具elasticsearch:第一章:理论知识

    cluster 整个elasticsearch 默认就是集群状态,整个集群是一份完整、互备的数据。 node 集群中的一个节点,一般只一个进程就是一个node shard 分片,即使是一个节点中的数据也会通过hash算法,分成多个片存放,默认是5片。 index 相当于rdbms的database, 对于用户来说是一个逻辑数据库

    2024年04月16日
    浏览(49)
  • Mysql 实现类似于 ElasticSearch 的全文检索功能

    ​ 一、前言 今天一个同事问我,如何使用 Mysql 实现类似于 ElasticSearch 的全文检索功能,并且对检索跑分?我当时脑子里立马产生了疑问?为啥不直接用es呢?简单好用还贼快。但是听他说,数据量不多,客户给的时间非常有限,根本没时间去搭建es,所以还是看一下

    2024年02月03日
    浏览(46)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包