Sentinel
-
测试软件
- jmeter
-
雪崩问题
- 个微服务往往依赖于多个其它微服务,服务提供者I发生了故障,依赖于当前服务的其它服务随着时间的推移形成级联失败
-
超时处理
- 设定超时时间,请求超过一定时间没有响应就返回错误信息
-
仓壁模式
- 限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,也叫线程隔离
-
断路器
- 由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务请求
-
限流
- 流量控制,限制业务访问的QPS,避免服务因流量的突增而故障
- Sentinel
-
Sentinel
- 运行
- java -jar sentinel-dashboard-1.8.1.jar
- 配置
- server.port
- sentinel.dashboard.auth.username
- 默认用户名
- sentinel.dashboard.auth.password
- 默认密码
- 运行
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar
微服务整合
- 依赖
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
- 配置
server:
port: 8088
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8080
限流
- 流控模式
- 直接:对当前资源限流
- 关联:高优先级资源触发阈值,对低优先级资源限流。
- 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
- 流控效果
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
- 默认的处理方式
- warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常
- 这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
- 排队等待:让所有的请求按照先后次序排队执行
- 两个请求的间隔不能小于指定时长
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
隔离和降级
- 线程隔离
- 调用者在调用服务提供者时,给每个调用的请求分配独立线程池
- 出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽
- 熔断降级
- 是在调用方这边加入断路器,统计对服务提供者的调用
- 如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者
FeignClient整合Sentinel
- 配置
feign:
sentinel:
enabled: true # 开启feign对sentinel的支持
- 实现FallbackFactory
@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
@Override
public UserClient create(Throwable throwable) {
return new UserClient() {
@Override
public User findById(Long id) {
log.error("查询用户异常", throwable);
return new User();
}
};
}
}
- FallbackFactory注册Bean:
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
return new UserClientFallbackFactory();
}
- UserClient接口中使用FallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {
@GetMapping("/user/{id}")
User findById(@PathVariable("id") Long id);
}
线程隔离
- 线程池隔离
- 给每个服务调用业务分配一个线程池
- 利用线程池本身实现隔离效果
- 信号量隔离
- 计数器模式,记录业务使用的线程数量
- 达到信号量上限时,禁止新的请求
熔断降级
- 断路器控制熔断和放行是通过状态机来完成
- 状态机
- closed:关闭状态
- 断路器放行所有请求,并开始统计异常比例、慢请求比例
- 超过阈值则切换到open状态
- open:打开状态
- 服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑
- Open状态5秒后会进入half-open状态
- half-open:半开状态
- 放行一次请求,根据执行结果来判断接下来的操作。
- 请求成功:则切换到closed状态
- 请求失败:则切换到open状态
- closed:关闭状态
- 器熔断策略
- 慢调用
- 异常比例
- 异常数
授权规则
-
白名单:来源(origin)在白名单内的调用者允许访问
-
黑名单:来源(origin)在黑名单内的调用者不允许访问
-
定义RequestOriginParser接口,返回不同的origin
@Component
public class HeaderOriginParser implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest request) {
// 1.获取请求头
String origin = request.getHeader("origin");
// 2.非空判断
if (StringUtils.isEmpty(origin)) {
origin = "blank";
}
return origin;
}
}
- 网关的请求添加请求头
spring:
cloud:
gateway:
default-filters:
- AddRequestHeader=origin,gateway
- 实现BlockExceptionHandler接口,自定义异常时的返回结果
- 处理请求被限流、降级、授权拦截时抛出BlockException
@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
String msg = "未知异常";
int status = 429;
if (e instanceof FlowException) {
msg = "请求被限流了";
} else if (e instanceof ParamFlowException) {
msg = "请求被热点参数限流";
} else if (e instanceof DegradeException) {
msg = "请求被降级了";
} else if (e instanceof AuthorityException) {
msg = "没有权限访问";
status = 401;
}
response.setContentType("application/json;charset=utf-8");
response.setStatus(status);
response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
}
}
规则持久化
- 规则管理模式
- 原始模式
- Sentinel的默认模式,将规则保存在内存,重启服务会丢失
- pull模式
- 控制台将配置的规则推送到Sentinel客户端
- 客户端会将配置规则保存在本地文件或数据库中
- 定时去本地文件或数据库中查询,更新本地规则
- push模式
- 控制台将配置规则推送到远程配置中心Nacos
- Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新
- 原始模式
实现push模式
- 依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
- 配置
spring:
cloud:
sentinel:
datasource:
flow:
nacos:
server-addr: localhost:8848 # nacos地址
dataId: orderservice-flow-rules
groupId: SENTINEL_GROUP
rule-type: flow # 还可以是:degrade、authority、param-flow
- 修改sentinel-dashboard源码
- 重新启动
分布式事务
-
CAP
- Consistency(一致性)
- Availability(可用性)
- Partition tolerance (分区容错性)
-
BASE
- Basically Available (基本可用)
- Soft State(软状态)
- Eventually Consistent(最终一致性)
-
分布式事务
- AP模式
- 各子事务分别执行和提交,允许出现结果不一致
- 采用弥补措施恢复数据即可,实现最终一致
- CP模式
- 各个子事务执行后互相等待,同时提交,同时回滚,达成强一致
- 事务等待过程中,处于弱可用状态
- AP模式
Seata
- Seata的架构
- TC (Transaction Coordinator)
- 事务协调者
- TM (Transaction Manager)
- 事务管理器
- RM (Resource Manager)
- 资源管理器
- TC (Transaction Coordinator)
- 下载
- http://seata.io/zh-cn/blog/download.html
微服务集成
- 依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--版本较低,1.3.0,因此排除-->
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<!--<version>1.4.2</version>-->
<version>${seata.version}</version>
</dependency>
- 配置
seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
# 参考tc服务自己的registry.conf中的配置
type: nacos
nacos: # tc
server-addr: 127.0.0.1:8848
namespace: ""
group: DEFAULT_GROUP
application: seata-tc-server # tc服务在nacos中的服务名称
cluster: SH
tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
service:
vgroup-mapping: # 事务组与TC服务cluster的映射关系
seata-demo: SH
XA模式
-
RM一阶段的工作:
- 注册分支事务到TC
- 执行分支业务sql但不提交
- 报告执行状态到TC
-
TC二阶段的工作:
- TC检测各分支事务执行状态
- 如果都成功,通知所有RM提交事务
- 如果有失败,通知所有RM回滚事务
- TC检测各分支事务执行状态
-
RM二阶段的工作:
- 接收TC指令,提交或回滚事务
-
配置文件
seata:
dtat-source-proxy-mode: XA
- 添加注解
- 全局事务的入口方法添加@GlobalTransactional注解
AT模式
-
阶段一RM的工作:
- 注册分支事务
- 记录undo-log(数据快照)
- 执行业务sql并提交
- 报告事务状态
-
阶段二提交时RM的工作:
- 删除undo-log即可
- 阶段二回滚时RM的工作:
- 根据undo-log恢复数据到更新前
-
配置文件
seata:
dtat-source-proxy-mode: AT
- 添加注解
- 全局事务的入口方法添加@GlobalTransactional注解
TCC模式
- 需要实现三个方法
- Try:资源的检测和预留;
- Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
- Cancel:预留资源释放,可以理解为try的反向操作。
@LocalTCC
public interface AccountTCCService {
@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money")int money);
boolean confirm(BusinessActionContext ctx);
boolean cancel(BusinessActionContext ctx);
}
SAGA模式
- 一阶段
- 直接提交本地事务
- 二阶段
- 成功则什么都不做;失败则通过编写补偿业务来回滚
Redis
Redis持久化
-
RDB持久化
- Redis Database Backup file
- Redis数据备份文件,也被叫做Redis数据快照。
- 把内存中的所有数据都记录到磁盘中
- 当Redis实例故障重启后,从磁盘读取快照文件,恢复数据
-
RDB执行时机
- Redis停机时会执行一次RDB
- Redis内部有触发RDB的机制
- 可以在redis.conf文件中找到
-
异步持久化bgsave
- fork主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的RDB文件
- 用新RDB文件替换旧的RDB文件
-
AOF持久化
-
Append Only File(追加文件)
-
Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件
-
频率
- always
- ererysec
- no
Redis主从
-
搭建主从架构
- 搭建主从集群,实现读写分离
-
主从数据同步原理
- 全量同步
- 第一次同步
- 增量同步
- slave重启后同步
- 全量同步
Redis哨兵
-
Sentinel
- 监控
- 心跳
- 自动故障恢复
- 选举新master
- 通知
- 通知客户端新的master
- 监控
-
心跳检测
- 主观下线
- 客观下线
RedisTemplate哨兵模式
- 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
password: 1234 #如果Redis有密码市一定要配置密码
sentinel:
master: mymaster #指定master名称
nodes: # 指定Redis集群信息
- ip地址:27001
- ip地址:27002
- ip地址:27003
- RedisTemplate配置主从读写分离
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
// REPLICA_PREFERRED 优先从slave节点读取
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
Redis分片集群(增强存储能力)
- 分片集群
- 每个集群多个master,每个master保存不同数据
- 每个master多个slave
- master直接通过ping监测彼此
- 客户端访问集群任意节点,最终都会被转发到正确的节点
- 散列插槽
- redis会把master节点映射到0~1638插槽上
- 数据的key与插槽绑定
集群伸缩
- 新增
- redis-cli -a 密码 --cluster add-node ip地址:port ip地址:7001
- 分配插槽
- redis-cli -a 密码 --cluster reshard ip地址:port
- 删除
- redis-cli -a 密码 --cluster del-node ip地址:port 节点id
故障转移
-
自动故障转移
- 自动
-
手动故障转移
- cluster failover命令
RedisTemplate访问分片集群
- 引入redis依赖
- 配置分片集群地址
- 配置读写分离
spring:
redis:
cluster:
nodes:
- ip:port
- ip:port
- ip:port
- ip:port
- ip:port
多级缓存
- 多级缓存
- 浏览器&客户端
- nginx本地缓存
- redis缓存
- JVM进程缓存
- 数据库
JVM进程缓存
- 分布式缓存
- redis
- 进程本地缓存
- HashMap
- GuavaCache
- Caffeine
- Spring内部的缓存使用的就是Caffeine
Caffeine
// 构建cache
Cache<String,String> cache = Caffeine.newBuilder().build();
// 存储数据
cache.put("gf","xxx");
// 取值
String gf = cache.getIfPresent("gf");
String defaultGF = cache.get("defaultGF",key->{
return "xxx"
});
-
缓存驱逐策略
- 基于容器
- 数量上限
- 基于时间
- 设置有效时间
- 基于引用
- 不建议使用
- 基于容器
-
基于容器
Cache<String,String> cache = Caffeine.newBuilder().maximumSize(1).build();
- 基于时间
Cache<String,String> cache = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(10))
.build();
Lua语法
helloworld
- 新建
touch hello.lua
- 添加内容
print("hello world")
- 运行
lua hello.lua
数据类型
- 数据类型
- nil
- 无效值
- boolean
- number
- string
- function
- table
- 数组key为索引
- local arr = {‘xx’,‘xx’}
- print(arr[1])
- map
- loacl map = {name=‘xx’,age=2}
- map[‘name’]
- map.name
- 数组key为索引
- nil
- 查看数据类型
- type()
- 循环遍历table
local arr = {'ar','xx',ss}
for index,value in ipairs(arr) do
print(index,value)
end
loacl map = {name='xx',age=2}
for index,value in ipairs(map) do
print(index,value)
end
- 函数
function printArr(arr)
for index,value in ipairs(arr) do
print(value)
end
end
- 条件控制
if(布尔表达式)
then
else
end
- 逻辑运算
- and
- or
- not
OpenResty
-
基于nginx的高性能web平台
-
安装
- OpenResty
- opm
-
目录
- /usr/local/openresty
-
获取请求参数
- 路径占位符
- /item/111
- local id=ngx.var[1]
- 请求头
- local headers = ngx.req.get_headers()
- Get
- local getParams = ngx.req.get_uri_args()
- post
- gx.req.read_body()
- local postParams = ngx.req.get_post_args()
- JSON
- gx.req.read_body()
- local jsonBody = ngx.req.get_post_data()
- 路径占位符
-
封装http请求
- /usr/lodal/openresty/lualib/common.lua
--封装函数,发送http请求,并解析响应
local function read http(path,params)
local resp = ngx.location.capture(path,{
method =ngx.HTTP GET,
args =params,
})
if not resp then
--记录错误信息,返回404
ngx.log(ngx.ERR,"http not found,path:",path ,",args:",args)
ngx.exit(404)
end
return resp.body
end
--将方法导出
Local _M = {
read_http = read_http
}
return _M
--导入common函数库
local common=require('common')
local read_http = common.read_http
--导入cjson库
local cjson=require('cjson')
-- 获取路径参数
local id =ngx.var[1]
-- 查询商品信息
local itemJSON =read_http("/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_http("/item/stock/"..id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock =stock.stock
item.sold = stock.sold
-- 把item序列化为ison 返回结果
ngx.say(cjson.encode(item))
nginx本地缓存
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache
-- 封装查询函数
function read_data(key, expire, path, params)
-- 查询本地缓存
local val = item_cache:get(key)
if not val then
ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
-- 查询redis
val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
end
-- 查询成功,把数据写入本地缓存
item_cache:set(key, val, expire)
-- 返回数据
return val
end
- 设置缓存时间1800/60
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)
Redis 缓存预热
-
冷启动
- 服务刚刚启动时,redis中并没有缓存
- 如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
-
缓存预热
- 在实际开发中,可以利用大数据统计用户访问的热点数据,项目启动时将这些热点数据提前查询并保存redis中
-
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 配置
spring:
redis:
host: 192.168.150.101
- 初始化类
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}
- openResty查询redis
- common.lua
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)
-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
local pool_size = 100 --连接池大小
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
if not ok then
ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
end
end
-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
-- 获取一个连接
local ok, err = red:connect(ip, port)
if not ok then
ngx.log(ngx.ERR, "连接redis失败 : ", err)
return nil
end
-- 查询redis
local resp, err = red:get(key)
-- 查询失败处理
if not resp then
ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
end
--得到的数据为空处理
if resp == ngx.null then
resp = nil
ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
end
close_redis(red)
return resp
end
-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
local resp = ngx.location.capture(path,{
method = ngx.HTTP_GET,
args = params,
})
if not resp then
-- 记录错误信息,返回404
ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
ngx.exit(404)
end
return resp.body
end
-- 将方法导出
local _M = {
read_http = read_http,
read_redis = read_redis
}
return _M
- 先查redis,没有再查服务
- item.lua
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 封装查询函数
function read_data(key, path, params)
-- 查询本地缓存
local val = read_redis("127.0.0.1", 6379, key)
-- 判断查询结果
if not val then
ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
-- redis查询失败,去查询http
val = read_http(path, params)
end
-- 返回数据
return val
end
-- 获取路径参数
local id = ngx.var[1]
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)
-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold
-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
缓存同步
- 同步策略
- 设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
- 同步双写:在修改数据库的同时,直接修改缓存
- 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- MQ
- Canal
- 自己伪装成MySQL的一个slave节点,监听master的binary log变化
Canal
- 依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
- 配置
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址
- domain
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id
private Long id;//商品id
@Column(name = "name")
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient
private Integer stock;
@TableField(exist = false)
@Transient
private Integer sold;
- 监听器
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;
@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
- RedisHandler
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}
RabbitMQ高级特性
消息可靠性
-
丢失原因
- 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
- 发送时丢失
-
解决方案
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
生产者消息确认
-
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
-
publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
-
配置
spring:
rabbitmq:
# simple:同步等待confirm结果,直到超时
# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
template:
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
mandatory: true
- ReturnCallback
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}
- ConfirmCallback
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
// 休眠一会儿,等待ack回执
Thread.sleep(2000);
消息持久化
-
消息持久化机制
- 交换机持久化
- 队列持久化
- 消息持久化
-
交换机持久化
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
- 队列持久化
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
- 消息持久化
// 准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(Standardcharsets.UTF_8))
// 默认就是持久化的
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // durable 持久
.build();
// 发送消息
rabbitTemplate.convertAndSend( "simple.queue", message);
消费者确认
- SpringAMQP三种确认模式:
- manual
- 手动ack,需要在业务代码结束后,调用api发送ack。
- auto
- 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none
- 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- manual
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 关闭ack none/auto/manual
消费失败重试机制
- 本地重试
- 利用Spring的retry机制,在消费者出现异常时利用本地重试
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- 失败策略
- MessageRecovery接口
- RejectAndDontRequeueRecoverer
- 重试耗尽后,直接reject,丢弃消息
- 默认就是这种方式
- ImmediateRequeueMessageRecoverer
- 重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer
- 重试耗尽后,将失败消息投递到指定的交换机
- RejectAndDontRequeueRecoverer
- MessageRecovery接口
@Configuration
public class ErrorMessageConfig {
@Bean
// 交换机
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
// 队列
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
// RepublishMessageRecoverer,关联队列和交换机
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
死信交换机
- 死信(dead letter)
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
- 死信交换机
- 配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
TTL
-
ttl
- 队列设置超时时间,配置x-message-ttl属性
-
死信交换机、死信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.ttl.queue", durable = "true"),
exchange = @Exchange(name = "dl.ttl.direct"),
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
- 声明一个队列,并且指定TTL
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
.build();
}
- 将ttl与交换机绑定
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
- 发送消息时,设定TTL
@Test
public void testTTLMsg() {
// 创建消息
Message message = MessageBuilder
.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
log.debug("发送消息成功");
}
延迟队列
-
插件
- DelayExchange
-
流程
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
-
声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
- 发送延迟消息
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
惰性队列
-
消息堆积
- 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限
- 之后发送的消息就会成为死信,可能会被丢弃
-
Lazy Queues文章来源:https://www.toymoban.com/news/detail-858080.html
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
-
通过命令行将一个运行中的队列修改为惰性队列文章来源地址https://www.toymoban.com/news/detail-858080.html
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
- java
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
集群
- 普通集群/叫标准集群(classic cluster)
- 会在集群的各个节点间共享部分数据
- 包括:交换机、队列元信息
- 不包含:队列中的消息
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
- 会在集群的各个节点间共享部分数据
- 镜像集群/主从模式
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
- 仲裁队列
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
仲裁队列
- 创建仲裁队列
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue") // 持久化
.quorum() // 仲裁队列
.build();
}
- 配置
spring:
rabbitmq:
addresses: ip:port, ip:port, ip:port
username: xx
password: xx
virtual-host: /
到了这里,关于学习笔记-微服务高级(黑马程序员)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!