swagger
(1)简介
Swagger 是一个规范和完整的框架,用于生成、描述、调用和可视化 RESTful 风格的 Web 服务(API Documentation & Design Tools for Teams | Swagger)。 它的主要作用是:
-
使得前后端分离开发更加方便,有利于团队协作
-
接口的文档在线自动生成,降低后端开发人员编写接口文档的负担
-
功能测试
Spring已经将Swagger纳入自身的标准,建立了Spring-swagger项目,现在叫Springfox。通过在项目中引入Springfox ,即可非常简单快捷的使用Swagger。
(2)SpringBoot集成Swagger
-
引入依赖,在heima-leadnews-model和heima-leadnews-common模块中引入该依赖
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> </dependency>
只需要在heima-leadnews-common中进行配置即可,因为其他微服务工程都直接或间接依赖即可。
-
在heima-leadnews-common工程中添加一个配置类
新增:com.heima.common.swagger.SwaggerConfiguration
package com.heima.common.swagger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfiguration {
@Bean
public Docket buildDocket() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(buildApiInfo())
.select()
// 要扫描的API(Controller)基础包
.apis(RequestHandlerSelectors.basePackage("com.heima"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo buildApiInfo() {
Contact contact = new Contact("黑马程序员","","");
return new ApiInfoBuilder()
.title("黑马头条-平台管理API文档")
.description("黑马头条后台api")
.contact(contact)
.version("1.0.0").build();
}
}
在heima-leadnews-common模块中的resources目录中新增以下目录和文件
文件:resources/META-INF/Spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.heima.common.swagger.SwaggerConfiguration
(3)Swagger常用注解
在Java类中添加Swagger的注解即可生成Swagger接口文档,常用Swagger注解如下:
@Api:修饰整个类,描述Controller的作用
@ApiOperation:描述一个类的一个方法,或者说一个接口
@ApiParam:单个参数的描述信息
@ApiModel:用对象来接收参数
@ApiModelProperty:用对象接收参数时,描述对象的一个字段
@ApiResponse:HTTP响应其中1个描述
@ApiResponses:HTTP响应整体描述
@ApiIgnore:使用该注解忽略这个API
@ApiError :发生错误返回的信息
@ApiImplicitParam:一个请求参数
@ApiImplicitParams:多个请求参数的描述信息
@ApiImplicitParam属性:
属性 | 取值 | 作用 |
---|---|---|
paramType | 查询参数类型 | |
path | 以地址的形式提交数据 | |
query | 直接跟参数完成自动映射赋值 | |
body | 以流的形式提交 仅支持POST | |
header | 参数在request headers 里边提交 | |
form | 以form表单的形式提交 仅支持POST | |
dataType | 参数的数据类型 只作为标志说明,并没有实际验证 | |
Long | ||
String | ||
name | 接收参数名 | |
value | 接收参数的意义描述 | |
required | 参数是否必填 | |
true | 必填 | |
false | 非必填 | |
defaultValue | 默认值 |
我们在ApUserLoginController中添加Swagger注解,代码如下所示:
@RestController
@RequestMapping("/api/v1/login")
@Api(value = "app端用户登录", tags = "ap_user", description = "app端用户登录API")
public class ApUserLoginController {
@Autowired
private ApUserService apUserService;
@PostMapping("/login_auth")
@ApiOperation("用户登录")
public ResponseResult login(@RequestBody LoginDto dto){
return apUserService.login(dto);
}
}
LoginDto
@Data
public class LoginDto {
/**
* 手机号
*/
@ApiModelProperty(value="手机号",required = true)
private String phone;
/**
* 密码
*/
@ApiModelProperty(value="密码",required = true)
private String password;
}
启动user微服务,访问地址:http://localhost:51801/swagger-ui.html
knife4j
(1)简介
knife4j是为Java MVC框架集成Swagger生成Api文档的增强解决方案,前身是swagger-bootstrap-ui,取名kni4j是希望它能像一把匕首一样小巧,轻量,并且功能强悍!
gitee地址:knife4j: Knife4j是一个集Swagger2 和 OpenAPI3为一体的增强解决方案
官方文档:Knife4j · 集Swagger2及OpenAPI3为一体的增强解决方案. | Knife4j
效果演示:http://knife4j.xiaominfo.com/doc.html
(2)核心功能
该UI增强包主要包括两大核心功能:文档说明 和 在线调试
-
文档说明:根据Swagger的规范说明,详细列出接口文档的说明,包括接口地址、类型、请求示例、请求参数、响应示例、响应参数、响应码等信息,使用swagger-bootstrap-ui能根据该文档说明,对该接口的使用情况一目了然。
-
在线调试:提供在线接口联调的强大功能,自动解析当前接口参数,同时包含表单验证,调用参数可返回接口响应内容、headers、Curl请求命令实例、响应时间、响应状态码等信息,帮助开发者在线调试,而不必通过其他测试工具测试接口是否正确,简介、强大。
-
个性化配置:通过个性化ui配置项,可自定义UI的相关显示信息
-
离线文档:根据标准规范,生成的在线markdown离线文档,开发者可以进行拷贝生成markdown接口文档,通过其他第三方markdown转换工具转换成html或pdf,这样也可以放弃swagger2markdown组件
-
接口排序:自1.8.5后,ui支持了接口排序功能,例如一个注册功能主要包含了多个步骤,可以根据swagger-bootstrap-ui提供的接口排序规则实现接口的排序,step化接口操作,方便其他开发者进行接口对接
(3)快速集成
-
在heima-leadnews-common模块中的
pom.xml
文件中引入knife4j
的依赖,如下:
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
</dependency>
-
创建Swagger配置文件
在heima-leadnews-common模块中新建配置类
新建Swagger的配置文件SwaggerConfiguration.java
文件,创建springfox提供的Docket分组对象,代码如下:
package com.heima.common.knife4j;
import com.github.xiaoymin.knife4j.spring.annotations.EnableKnife4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import springfox.bean.validators.configuration.BeanValidatorPluginsConfiguration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
@EnableKnife4j
@Import(BeanValidatorPluginsConfiguration.class)
public class Swagger2Configuration {
@Bean(value = "defaultApi2")
public Docket defaultApi2() {
Docket docket=new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
//分组名称
.groupName("1.0")
.select()
//这里指定Controller扫描包路径
.apis(RequestHandlerSelectors.basePackage("com.heima"))
.paths(PathSelectors.any())
.build();
return docket;
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("黑马头条API文档")
.description("黑马头条API文档")
.version("1.0")
.build();
}
}
以上有两个注解需要特别说明,如下表:
注解 | 说明 |
---|---|
@EnableSwagger2 |
该注解是Springfox-swagger框架提供的使用Swagger注解,该注解必须加 |
@EnableKnife4j |
该注解是knife4j 提供的增强注解,Ui提供了例如动态参数、参数过滤、接口排序等增强功能,如果你想使用这些增强功能就必须加该注解,否则可以不用加 |
-
添加配置
在Spring.factories中新增配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.heima.common.swagger.Swagger2Configuration, \
com.heima.common.swagger.SwaggerConfiguration
-
访问
在浏览器输入地址:http://host:port/doc.html
HelloController
@Controller
public class HelloController {
@GetMapping("/basic")
public String hello(Model model){
//name
//model.addAttribute("name","freemarker");
//stu
Student student = new Student();
student.setName("小明");
student.setAge(18);
model.addAttribute("stu",student);
return "01-basic";
}
@GetMapping("/list")
public String list(Model model){
//------------------------------------
Student stu1 = new Student();
stu1.setName("小强");
stu1.setAge(18);
stu1.setMoney(1000.86f);
stu1.setBirthday(new Date());
//小红对象模型数据
Student stu2 = new Student();
stu2.setName("小红");
stu2.setMoney(200.1f);
stu2.setAge(19);
//将两个对象模型数据存放到List集合中
List<Student> stus = new ArrayList<>();
stus.add(stu1);
stus.add(stu2);
//向model中存放List集合数据
model.addAttribute("stus",stus);
//------------------------------------
//创建Map数据
HashMap<String,Student> stuMap = new HashMap<>();
stuMap.put("stu1",stu1);
stuMap.put("stu2",stu2);
// 3.1 向model中存放Map数据
model.addAttribute("stuMap", stuMap);
//日期
model.addAttribute("today",new Date());
//长数值
model.addAttribute("point",323213123132312L);
return "02-list";
}
}
01-basic.ftl
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello World!</title>
</head>
<body>
<b>普通文本 String 展示:</b><br><br>
Hello ${name!''} <br>
<hr>
<b>对象Student中的数据展示:</b><br/>
姓名:${stu.name}<br/>
年龄:${stu.age}
<hr>
</body>
</html>
02-list.ftl
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Hello World!</title>
</head>
<body>
<#-- list 数据的展示 -->
<b>展示list中的stu数据:</b>
<br>
<br>
<table>
<tr>
<td>序号</td>
<td>姓名</td>
<td>年龄</td>
<td>钱包</td>
</tr>
<#if stus??>
<#list stus as stu>
<#if stu.name='小红'>
<tr style="color: red">
<td>${stu_index+1}</td>
<td>${stu.name}</td>
<td>${stu.age}</td>
<td>${stu.money}</td>
</tr>
<#else>
<tr>
<td>${stu_index+1}</td>
<td>${stu.name}</td>
<td>${stu.age}</td>
<td>${stu.money}</td>
</tr>
</#if>
</#list>
</#if>
stu集合的大小:${stus?size}<br/>
</table>
<hr>
<#-- Map 数据的展示 -->
<b>map数据的展示:</b>
<br/><br/>
<a href="###">方式一:通过map['keyname'].property</a><br/>
输出stu1的学生信息:<br/>
姓名:${stuMap['stu1'].name}<br/>
年龄:${stuMap['stu1'].age}<br/>
<br/>
<a href="###">方式二:通过map.keyname.property</a><br/>
输出stu2的学生信息:<br/>
姓名:${stuMap.stu2.name}<br/>
年龄:${stuMap.stu2.age}<br/>
<br/>
<a href="###">遍历map中两个学生信息:</a><br/>
<table>
<tr>
<td>序号</td>
<td>姓名</td>
<td>年龄</td>
<td>钱包</td>
</tr>
<#list stuMap?keys as key >
<tr>
<td>${key_index+1}</td>
<td>${stuMap[key].name}</td>
<td>${stuMap[key].age}</td>
<td>${stuMap[key].money}</td>
</tr>
</#list>
</table>
<hr>
当前的日期为:${today?datetime}<br/>
当前的日期为:${today?string("yyyy年MM月")}
--------------------------<br>
${point?c}
</body>
</html>
@SpringBootTest(classes = FreemarkerDemoApplication.class)
@RunWith(SpringRunner.class)
public class FreemarkerTest {
@Autowired
private Configuration configuration;
@Test
public void test() throws IOException, TemplateException {
Template template = configuration.getTemplate("02-list.ftl");
/**
* 合成方法
* 两个参数
* 第一个参数:模型参数
* 第二个参数:输出流
*/
template.process(getData(),new FileWriter("c:/list.html"));
}
private Map getData(){
Map<String, Object> map = new HashMap();
Student stu1 = new Student();
stu1.setName("小强");
stu1.setAge(18);
stu1.setMoney(1000.86f);
stu1.setBirthday(new Date());
//小红对象模型数据
Student stu2 = new Student();
stu2.setName("小红");
stu2.setMoney(200.1f);
stu2.setAge(19);
//将两个对象模型数据存放到List集合中
List<Student> stus = new ArrayList<>();
stus.add(stu1);
stus.add(stu2);
//向model中存放List集合数据
map.put("stus",stus);
//model.addAttribute("stus",stus);
//------------------------------------
//创建Map数据
HashMap<String,Student> stuMap = new HashMap<>();
stuMap.put("stu1",stu1);
stuMap.put("stu2",stu2);
// 3.1 向model中存放Map数据
map.put("stuMap", stuMap);
//model.addAttribute("stuMap", stuMap);
//日期
map.put("today",new Date());
//model.addAttribute("today",new Date());
//长数值
map.put("point",323213123132312L);
//model.addAttribute("point",323213123132312L);
return map;
}
}
新版本MinlO用以下方式启动:
docker run -d \-p 9000:9000 \-p 9001:9001 \--name minio1 \-v /home/minio/data:/data \-e "MINIO_ROOT_USER=minio" \-e "MINIO_ROOT_PASSWORD=minio123" \minio/minio server /data --console-address ":9001"
假设我们的服务器地址为http://192.168.200.130:9000,我们在地址栏输入:http://http://192.168.200.130:9000/ 即可进入登录界面。
Access Key为minio Secret_key 为minio123 进入系统后可以看到主界面
点击右下角的“+”号 ,点击下面的图标,创建一个桶
创建minio-demo,对应pom如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>heima-leadnews-test</artifactId>
<groupId>com.heima</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>minio-demo</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>7.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
引导类:
package com.heima.minio;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MinIOApplication {
public static void main(String[] args) {
SpringApplication.run(MinIOApplication.class,args);
}
}
创建测试类,上传html文件
package com.heima.minio.test;
import io.minio.MinioClient;
import io.minio.PutObjectArgs;
import java.io.FileInputStream;
public class MinIOTest {
public static void main(String[] args) {
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream("D:\\list.html");;//文件读取的位置
//1.创建minio链接客户端
MinioClient minioClient = MinioClient.builder().credentials("minio", "minio123").endpoint("http://192.168.200.130:9000").build();
//2.上传
PutObjectArgs putObjectArgs = PutObjectArgs.builder()
.object("list.html")//文件名
.contentType("text/html")//文件类型
.bucket("leadnews")//桶名词 与minio创建的名词一致
.stream(fileInputStream, fileInputStream.available(), -1) //文件流,-1上传所有的文件
.build();
minioClient.putObject(putObjectArgs);
System.out.println("http://192.168.200.130:9000/leadnews/ak47.jpg");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
@SpringBootTest(classes = ArticleApplication.class)
@RunWith(SpringRunner.class)
public class ArticleFreemarkerTest {
@Autowired
private ApArticleContentMapper apArticleContentMapper;
@Autowired
private ApArticleService apArticleService;
@Autowired
private Configuration configuration;
@Autowired
private FileStorageService fileStorageService;
@Test
public void createStaticUrlTest() throws Exception {
//1.获取文章内容
//已知文章的id
ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.<ApArticleContent>lambdaQuery().eq(ApArticleContent::getArticleId,1302862387124125698L));
if(apArticleContent !=null && StringUtils.isNotBlank(apArticleContent.getContent())){
//2.文章内容通过freemarker生成html文件
Template template = configuration.getTemplate("article.ftl");
//数据模型
Map content = new HashMap();
content.put("content", JSONArray.parseArray(apArticleContent.getContent()));
StringWriter out = new StringWriter();
//合成
template.process(content,out);
//3.把html文件上传到minio中
InputStream in = new ByteArrayInputStream(out.toString().getBytes());
String path = fileStorageService.uploadHtmlFile("",apArticleContent.getArticleId()+".html",in);
//4.修改ap_article表,保存static_utl字段
apArticleService.update(Wrappers.<ApArticle>lambdaUpdate().eq(ApArticle::getId,apArticleContent.getArticleId())
.set(ApArticle::getStaticUrl,path));
}
}
}
网关,token解析为用户存入header
Claims claimsBody = AppJwtUtil.getClaimsBody(token);
//是否是过期
int result = AppJwtUtil.verifyToken(claimsBody);
if(result == 1 || result == 2){
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.setComplete();
}
//获取用户信息
Object userId = claimsBody.get("id");
//存入header中
ServerHttpRequest serverHttpRequest = request.mutate().headers(httpHeaders -> {
httpHeaders.add("userId",userId+"");
}).build();
//重置请求
exchange.mutate().request(serverHttpRequest);
拦截器
public class WmTokenInterceptor implements HandlerInterceptor {
/**
* 得到header中的用户信息,并且存入到当前线程中
* @param request
* @param response
* @param handler
* @return
* @throws Exception
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String userId = request.getHeader("userId");
if(userId != null){
WmUser wmUser = new WmUser();
wmUser.setApUserId(Integer.valueOf(userId));
//存入到当前线程中
WmThreadLocalUtil.setUser(wmUser);
}
return true;
}
/**
* 清理线程中的数据
* @param request
* @param response
* @param handler
* @param modelAndView
* @throws Exception
*/
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
WmThreadLocalUtil.clear();
}
}
注册拦截器
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new WmTokenInterceptor()).addPathPatterns("/**");
}
}
mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法
第一:在实体类中的id上加入如下配置,指定类型为id_worker
@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;
第二:在application.yml文件中配置数据中心id和机器id
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 设置别名包扫描路径,通过该属性可以给包中的类注册别名
type-aliases-package: com.heima.model.article.pojos
global-config:
datacenter-id: 1
workerId: 1
feign基本使用
①:在heima-leadnews-feign-api中新增接口
先导入feign的依赖(在feign模块中导入)
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
定义文章端的接口(在feign模块中定义接口)
package com.heima.apis.article;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.io.IOException;
@FeignClient(value = "leadnews-article")//服务提供者的服务名称
public interface IArticleClient {
@PostMapping("/api/v1/article/save")//服务提供者的请求路径
public ResponseResult saveArticle(@RequestBody ArticleDto dto) ;
}
@RestController
public class ArticleClient implements IArticleClient {
@PostMapping("/api/v1/article/save")
@Override
public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
return null;
}
}
在heima-leadnews-wemedia服务(服务消费者)中已经依赖了heima-leadnews-feign-apis(feign模块)工程,只需要在自媒体的引导类中开启feign的远程调用即可
<dependency>
<groupId>com.heima</groupId>
<artifactId>heima-leadnews-feign-api</artifactId>
</dependency>
注解为:@EnableFeignClients(basePackages = "com.heima.apis")
需要指向apis这个包
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
public class WemediaApplication {
public static void main(String[] args) {
SpringApplication.run(WemediaApplication.class,args);
}
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}
然后直接注入即可使用
@Autowired
private IArticleClient iArticleClient;
实现步骤:
①:在heima-leadnews-feign-api编写降级逻辑
package com.heima.apis.article.fallback;
import com.heima.apis.article.IArticleClient;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import org.springframework.stereotype.Component;
/**
* feign失败配置
* @author itheima
*/
@Component
public class IArticleClientFallback implements IArticleClient {
@Override
public ResponseResult saveArticle(ArticleDto dto) {
return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
}
}
在自媒体微服务中添加类,扫描降级代码类的包
package com.heima.wemedia.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.heima.apis.article.fallback")
public class InitConfig {
}
②:远程接口中指向降级代码
package com.heima.apis.article;
import com.heima.apis.article.fallback.IArticleClientFallback;
import com.heima.model.article.dtos.ArticleDto;
import com.heima.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
public interface IArticleClient {
@PostMapping("/api/v1/article/save")
public ResponseResult saveArticle(@RequestBody ArticleDto dto);
}
③:客户端开启降级heima-leadnews-wemedia
在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务响应的超时的时间
hystrix 可以在服务器端配置降级,也可以在客户端配置降级
feign:
# 开启feign对hystrix熔断降级的支持
hystrix:
enabled: true
# 修改调用超时时间
client:
config:
default:
connectTimeout: 2000
readTimeout: 2000
目前的降级逻辑并未生效,还需要在heima-wemedia中开启包的扫描,使得该降级逻辑生效
@Configuration
@ComponentScan("com.heima.apis.article.fallback")
public class InitConfig {
}
④:测试
在ApArticleServiceImpl类中saveArticle方法添加代码
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
在自媒体端进行审核测试,会出现服务降级的现象。
/**
* 自媒体文章审核
* @param id 自媒体文章id
*/
@Override
@Async //标明当前方法是一个异步方法
public void autoScanWmNews(Integer id) {......}
/**
* 自媒体发布,修改,保存草稿
* @param dto
* @return
*/
@Override
public ResponseResult submit(WmNewsDto dto) {
//0.条件判断
if(dto == null || dto.getContent() == null){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//1.保存或修改文章
WmNews wmNews = new WmNews();
//属性拷贝 属性名称和类型相同才能拷贝
BeanUtils.copyProperties(dto,wmNews);
//封面图片 list ---> string
if(dto.getImages() != null && dto.getImages().size() > 0){
//[1.jpg,2.jpg]-->1.jpg,2.jpg
String imagStr = StringUtils.join(dto.getImages(),",");
wmNews.setImages(imagStr);
}
//如果当前封面类型为自动 -1
if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
wmNews.setType(null);
}
saveOrUpdateWmNews(wmNews);
//2.判断是否为草稿
if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
//3.不是草稿,保存文章内容图片与素材的关系
//获取到文章内容中的图片信息
List<String> materials = extractUrlInfo(dto.getContent());
saveRelativeInfoForContent(materials,wmNews.getId());
//4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
saveRelativeInfoForCover(dto,wmNews,materials);
//审核文章
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync //开启异步调用
public class WemediaApplication {
public static void main(String[] args) {
SpringApplication.run(WemediaApplication.class,args);
}
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}
package com.heima.utils.common;
import java.util.*;
public class SensitiveWordUtil {
public static Map<String, Object> dictionaryMap = new HashMap<>();
/**
* 生成关键词字典库
* @param words
* @return
*/
public static void initMap(Collection<String> words) {
if (words == null) {
System.out.println("敏感词列表不能为空");
return ;
}
// map初始长度words.size(),整个字典库的入口字数(小于words.size(),因为不同的词可能会有相同的首字)
Map<String, Object> map = new HashMap<>(words.size());
// 遍历过程中当前层次的数据
Map<String, Object> curMap = null;
Iterator<String> iterator = words.iterator();
while (iterator.hasNext()) {
String word = iterator.next();
curMap = map;
int len = word.length();
for (int i =0; i < len; i++) {
// 遍历每个词的字
String key = String.valueOf(word.charAt(i));
// 当前字在当前层是否存在, 不存在则新建, 当前层数据指向下一个节点, 继续判断是否存在数据
Map<String, Object> wordMap = (Map<String, Object>) curMap.get(key);
if (wordMap == null) {
// 每个节点存在两个数据: 下一个节点和isEnd(是否结束标志)
wordMap = new HashMap<>(2);
wordMap.put("isEnd", "0");
curMap.put(key, wordMap);
}
curMap = wordMap;
// 如果当前字是词的最后一个字,则将isEnd标志置1
if (i == len -1) {
curMap.put("isEnd", "1");
}
}
}
dictionaryMap = map;
}
/**
* 搜索文本中某个文字是否匹配关键词
* @param text
* @param beginIndex
* @return
*/
private static int checkWord(String text, int beginIndex) {
if (dictionaryMap == null) {
throw new RuntimeException("字典不能为空");
}
boolean isEnd = false;
int wordLength = 0;
Map<String, Object> curMap = dictionaryMap;
int len = text.length();
// 从文本的第beginIndex开始匹配
for (int i = beginIndex; i < len; i++) {
String key = String.valueOf(text.charAt(i));
// 获取当前key的下一个节点
curMap = (Map<String, Object>) curMap.get(key);
if (curMap == null) {
break;
} else {
wordLength ++;
if ("1".equals(curMap.get("isEnd"))) {
isEnd = true;
}
}
}
if (!isEnd) {
wordLength = 0;
}
return wordLength;
}
/**
* 获取匹配的关键词和命中次数
* @param text
* @return
*/
public static Map<String, Integer> matchWords(String text) {
Map<String, Integer> wordMap = new HashMap<>();
int len = text.length();
for (int i = 0; i < len; i++) {
int wordLength = checkWord(text, i);
if (wordLength > 0) {
String word = text.substring(i, i + wordLength);
// 添加关键词匹配次数
if (wordMap.containsKey(word)) {
wordMap.put(word, wordMap.get(word) + 1);
} else {
wordMap.put(word, 1);
}
i += wordLength - 1;
}
}
return wordMap;
}
public static void main(String[] args) {
List<String> list = new ArrayList<>();
list.add("法轮");
list.add("法轮功");
list.add("冰毒");
//初始化敏感词库
initMap(list);
String content="我是一个好人,并不会卖冰毒,也不操练法轮功,我真的不卖冰毒";
//文本中查找是否存在敏感词
Map<String, Integer> map = matchWords(content);
System.out.println(map);
}
}
Tess4j案例
①:创建项目导入tess4j对应的依赖
<dependency>
<groupId>net.sourceforge.tess4j</groupId>
<artifactId>tess4j</artifactId>
<version>4.1.1</version>
</dependency>
②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下
③:编写测试类进行测试
package com.heima.tess4j;
import net.sourceforge.tess4j.ITesseract;
import net.sourceforge.tess4j.Tesseract;
import java.io.File;
public class Application {
/**
* 识别图片中的文字
* @param args
*/
public static void main(String[] args) {
try {
//创建Tesseract对象
ITesseract tesseract = new Tesseract();
//设置字体库路径
tesseract.setDatapath("C:\\Users\\83825\\Desktop");
//设置语言 简体中文
tesseract.setLanguage("chi_sim");
//获取本地图片
File file = new File("C:\\Users\\83825\\Desktop\\test6.png");
//执行ocr识别
String result = tesseract.doOCR(file);
//替换回车和tal键 使结果为一行
result = result.replaceAll("\\r|\\n","-").replaceAll(" ","");
System.out.println("识别的结果为:"+result);
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意要用全英文路径
package com.heima.model.schedule.pojos;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* <p>
*
* </p>
*
* @author itheima
*/
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private byte[] parameters;
/**
* 优先级
*/
@TableField("priority")
private Integer priority;
/**
* 任务类型
*/
@TableField("task_type")
private Integer taskType;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
@Service
@Transactional
@Slf4j
public class TaskServiceImpl implements TaskService {
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success = addTaskToDb(task);
if(success){
//2.添加任务到redis
addTaskToCache(task);
}
return task.getTaskId();
}
@Autowired
private CacheService cacheService;
/**
* 把任务添加到redis中
* @param task
*/
private void addTaskToCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
//获取5分钟之后的时间,毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE,5);
long nextScheduleTime = calendar.getTimeInMillis();
if(task.getExecuteTime() <= System.currentTimeMillis()){
//2.1 如果任务的执行时间小于等于当前时间,存入List
cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));
}else if(task.getExecuteTime() <= nextScheduleTime){
//2.2 如果任意的执行时间大于当前时间 && 小于等于预设时间(未来5分钟)存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());
}
}
@Autowired
private TaskinfoMapper taskinfoMapper;
@Autowired
private TaskinfoLogsMapper taskinfoLogsMapper;
/**
* 添加任务到数据库中
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {
boolean flag = false;
try{
//保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task,taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
//设置taskID
task.setTaskId(taskinfo.getTaskId());
//保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo,taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
flag = true;
}catch (Exception e){
e.printStackTrace();;
}
return flag;
}
}
/**
* 取消任务
* @param taskId
* @return
*/
@Override
public boolean cancelTask(long taskId) {
boolean flag = false;
//删除任务,更新任务日志
Task task = updateDb(taskId,ScheduleConstants.CANCELLED);
//删除redis的数据
if (task != null){
removeTaskFromCache(task);
flag = true;
}
return flag;
}
/**
* 删除redis中的数据
* @param task
*/
private void removeTaskFromCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
if(task.getExecuteTime() <= System.currentTimeMillis()){
cacheService.lRemove(ScheduleConstants.TOPIC + key,0,JSON.toJSONString(task));
}else{
cacheService.zRemove(ScheduleConstants.FUTURE + key,JSON.toJSONString(task));
}
}
/**
* 删除任务,更新任务日志
* @param taskId
* @param status
* @return
*/
private Task updateDb(long taskId, int status) {
Task task = new Task();
try{
//删除任务
taskinfoMapper.deleteById(taskId);
//更新任务日志
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);
BeanUtils.copyProperties(taskinfoLogs,task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
}catch (Exception e){
log.error("task cancel exception taskId={}",taskId);
}
return task;
}
/**
* 按照类型和优先级拉取任务
* @param type
* @param priority
* @return
*/
@Override
public Task poil(int type, int priority) {
Task task = new Task();
try{
String key = type + "_" + priority;
//从redis中拉取数据 pop
String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if(StringUtils.isNotBlank(task_json)){
task = JSON.parseObject(task_json, Task.class);
//修改数据库信息
updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
}
}catch (Exception e){
e.printStackTrace();
log.error("poll task exception");
}
return task;
}
@Test
public void testKeys(){
Set<String> keys = cacheService.keys("future_*");
System.out.println(keys);
Set<String> scan = cacheService.scan("future_*");
System.out.println(scan);
}
//耗时6151
@Test
public void testPiple1(){
long start =System.currentTimeMillis();
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
}
System.out.println("耗时"+(System.currentTimeMillis()- start));
}
//1472毫秒
@Test
public void testPiple2(){
long start = System.currentTimeMillis();
//使用管道技术
List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
for (int i = 0; i <10000 ; i++) {
Task task = new Task();
task.setTaskType(1001);
task.setPriority(1);
task.setExecuteTime(new Date().getTime());
redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
}
return null;
}
});
System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
log.info("未来数据定时刷新---定时任务");
//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50
//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
//同步数据
if(!tasks.isEmpty()){
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将"+futureKey+"刷新到了"+topicKey);
}
}
}
public List<Object> refreshWithPipeline(String future_key,String topic_key,Collection<String> values){
List<Object> objects = stringRedisTemplate.executePipelined(new RedisCallback<Object>() {
@Nullable
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
StringRedisConnection stringRedisConnection = (StringRedisConnection)redisConnection;
String[] strings = values.toArray(new String[values.size()]);
stringRedisConnection.rPush(topic_key,strings);
stringRedisConnection.zRem(future_key,strings);
return null;
}
});
return objects;
}
@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling //开启定时任务注解
public class ScheduleApplication {
public static void main(String[] args) {
SpringApplication.run(ScheduleApplication.class,args);
}
/**
* mybatis-plus乐观锁支持
* @return
*/
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
return interceptor;
}
}
/**
* 加锁
*
* @param name
* @param expire 过期的时间
* @return
*/
public String tryLock(String name, long expire) {
name = name + "_lock";
String token = UUID.randomUUID().toString();
RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
RedisConnection conn = factory.getConnection();
try {
//参考redis命令:
//set key value [EX seconds] [PX milliseconds] [NX|XX]
Boolean result = conn.set(
name.getBytes(),
token.getBytes(),
Expiration.from(expire, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.SET_IF_ABSENT //NX
);
if (result != null && result)
return token;
} finally {
RedisConnectionUtils.releaseConnection(conn, factory,false);
}
return null;
}
/**
* 未来数据定时刷新
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh(){
String token = cacheService.tryLock("FUTRUE_TASK_SYNC",1000 * 30);
if(StringUtils.isNotBlank(token)){
log.info("未来数据定时刷新---定时任务");
//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50
//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
//同步数据
if(!tasks.isEmpty()){
cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
log.info("成功的将"+futureKey+"刷新到了"+topicKey);
}
}
}
}
/**
* 数据库任务定时同步到redis中
*/
@PostConstruct
@Scheduled(cron = "0 */5 * * * ?")
public void reloadData(){
//清理缓存中的数据 list zset
clearCache();
//查询小于未来5分钟的所有任务
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE,5);
List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
//新增任务到redis
if(taskinfoList != null&&taskinfoList.size() > 0) {
for (Taskinfo taskinfo : taskinfoList) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
log.info("数据库的任务同步到了redis");
}
/**
* 清理缓存中的数据
*/
public void clearCache(){
Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
cacheService.delete(topicKeys);
cacheService.delete(futureKeys);
}
package com.heima.utils.common;
import com.heima.model.wemedia.pojos.WmNews;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
public class ProtostuffUtil {
/**
* 序列化
* @param t
* @param <T>
* @return
*/
public static <T> byte[] serialize(T t){
Schema schema = RuntimeSchema.getSchema(t.getClass());
return ProtostuffIOUtil.toByteArray(t,schema,
LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
}
/**
* 反序列化
* @param bytes
* @param c
* @param <T>
* @return
*/
public static <T> T deserialize(byte []bytes,Class<T> c) {
T t = null;
try {
t = c.newInstance();
Schema schema = RuntimeSchema.getSchema(t.getClass());
ProtostuffIOUtil.mergeFrom(bytes,t,schema);
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return t;
}
/**
* jdk序列化与protostuff序列化对比
* @param args
*/
public static void main(String[] args) {
long start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
JdkSerializeUtil.serialize(wmNews);
}
System.out.println(" jdk 花费 "+(System.currentTimeMillis()-start));
start =System.currentTimeMillis();
for (int i = 0; i <1000000 ; i++) {
WmNews wmNews =new WmNews();
ProtostuffUtil.serialize(wmNews);
}
System.out.println(" protostuff 花费 "+(System.currentTimeMillis()-start));
}
}
WmNewsServiceImpl
@Override
public ResponseResult submit(WmNewsDto dto) {
......
//审核文章
//wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
WmNewsTaskServiceImpl
@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {
@Autowired
private IScheduleClient scheduleClient;
/**
* 添加任务到延迟队列中
* @param id 文章id
* @param publishTime 发布的时间 可以作为任务的执行时间
*/
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加任务到延迟服务中------begin");
Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟服务中------end");
}
@Autowired
private WmNewsAutoScanService wmNewsAutoScanService;
/**
* 消费任务,审核文章
*/
@Scheduled(fixedRate = 1000)
@Override
public void scanNewsByTask() {
log.info("消费任务,审核文章");
ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
if(responseResult.getCode().equals(200) && responseResult.getData() != null){
Task task = JSON.parseObject(JSON.toJSONString(responseResult.getData()),Task.class);
WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(),WmNews.class);
wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
}
}
}
遇到问题:
java: Annotation processing is not supported for module cycles. Please ensure that all modules from cycle [heima-leadnews-schedule,heima-leadnews-feign-api] are excluded from annotation processing
原因:heima-leadnews-feign-api需要heima-leadnews-schedule里面的一个task类,heima-leadnews-schedule需要实现heima-leadnews-feign-api的接口,从而导致形成了循环依赖,即heima-leadnews-feign-api依赖heima-leadnews-schedule,heima-leadnews-schedule也依赖了heima-leadnews-feign-api。
解决:将heima-leadnews-schedule里面的这个task类抽出来放到第三个专门管理类的模块,然后两个模块都去引用它,引用一个第三方模块来专门管理类可以避免循环依赖。
kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
-
Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
-
Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.136.152 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.136.152:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.136.152:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
消息生产者
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) {
//1.kafka连接配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.152:9092");
//key和value的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//3.发送消息
/**
* 第一个参数:topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","key-001","hello kafka");
producer.send(kvProducerRecord);
System.out.println("消息发送成功");
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
消息消费者
package com.heima.kafka.sample;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.152:9092");
//反序列化的key和value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"gruop1");
//2.创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
//4.拉取消息 每一秒钟拉取一次
while(true){ //模拟正在监听状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}
}
}
遇到问题:如果没有传输成功,需要关闭服务器的防火墙,将kafka的访问端口暴露出来
package com.heima.kafka.sample;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* 生产者
*/
public class ProducerQuickStart {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.kafka连接配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.152:9092");
//key和value的序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//ack配置,消息确认机制
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,10);
//消息压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
//2.创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
//3.发送消息
/**
* 第一个参数:topic
* 第二个参数:消息的key
* 第三个参数:消息的value
*/
ProducerRecord<String,String> kvProducerRecord = new ProducerRecord<String,String>("topic-first","hello kafka");
//同步发送消息
//RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
//System.out.println(recordMetadata.offset());//获取偏移量
//异步发送消息
producer.send(kvProducerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});
System.out.println("消息发送成功");
//4.关闭消息通道 必须要关闭,否则消息发送不成功
producer.close();
}
}
package com.heima.kafka.sample;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* 消费者
*/
public class ConsumerQuickStart {
public static void main(String[] args) {
//1.kafka的配置信息
Properties properties = new Properties();
//连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.152:9092");
//反序列化的key和value
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"gruop1");
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//2.创建消费者对象
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(properties);
//3.订阅主题
consumer.subscribe(Collections.singletonList("topic-first"));
//4.拉取消息 每一秒钟拉取一次
//同步提交和异步提交偏移量
try{
while(true) { //模拟正在监听状态
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
System.out.println(consumerRecord.offset());
System.out.println(consumerRecord.partition());
}
//异步提交偏移量
consumer.commitAsync();
}
}catch (Exception e){
e.printStackTrace();
System.out.println("记录错误的信息:"+e);
}finally {
//同步
consumer.commitSync();
}
//while(true){ //模拟正在监听状态
// ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// System.out.println(consumerRecord.key());
// System.out.println(consumerRecord.value());
// System.out.println(consumerRecord.offset());
// //System.out.println(consumerRecord.partition());
// //try{
// // //同步提交偏移量
// // consumer.commitSync();
// //}catch (CommitFailedException e){
// // System.out.println("记录提交失败的异常:"+ e);
// //}
// //异步提交当前最新的偏移量
// //consumer.commitAsync(new OffsetCommitCallback() {
// // @Override
// // public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
// // if(e!=null){
// // System.out.println("记录错误的提交偏移量:"+map+",异常信息"+e);
// // }
// // }
// //});
// }
//}
}
}
搭建ElasticSearch环境
拉取镜像
docker pull elasticsearch:7.4.0
创建容器
docker run -id --name elasticsearch -d --restart=always -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch/plugins:/usr/share/elasticsearch/plugins -e "discovery.type=single-node" elasticsearch:7.4.0
配置中文分词器 ik
因为在创建elasticsearch容器的时候,映射了目录,所以可以在宿主机上进行配置ik中文分词器
在去选择ik分词器的时候,需要与elasticsearch的版本好对应上
把资料中的elasticsearch-analysis-ik-7.4.0.zip
上传到服务器上,放到对应目录(plugins)解压
#切换目录
cd /usr/share/elasticsearch/plugins
#新建目录
mkdir analysis-ik
cd analysis-ik
#root根目录中拷贝文件
mv elasticsearch-analysis-ik-7.4.0.zip /usr/share/elasticsearch/plugins/analysis-ik
#解压文件
cd /usr/share/elasticsearch/plugins/analysis-ik
unzip elasticsearch-analysis-ik-7.4.0.zip
2.4) 使用postman测试
创建索引和映射
使用postman添加映射
put请求 : http://192.168.200.152:9200/app_info_article
{
"mappings":{
"properties":{
"id":{
"type":"long"
},
"publishTime":{
"type":"date"
},
"layout":{
"type":"integer"
},
"images":{
"type":"keyword",
"index": false
},
"staticUrl":{
"type":"keyword",
"index": false
},
"authorId": {
"type": "long"
},
"authorName": {
"type": "text"
},
"title":{
"type":"text",
"analyzer":"ik_smart"
},
"content":{
"type":"text",
"analyzer":"ik_smart"
}
}
}
}
GET请求查询映射:http://192.168.200.130:9200/app_info_article
DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
搭建搜索微服务
在heima-leadnews-service的pom中添加依赖
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.4.0</version>
</dependency>
(3)nacos配置中心leadnews-search
spring:
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
elasticsearch:
host: 192.168.136.152
port: 9200
@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {
@Autowired
private ApArticleMapper apArticleMapper;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 注意:数据量的导入,如果数据量过大,需要分页导入
* @throws Exception
*/
@Test
public void init() throws Exception {
//1.查询所有符合条件的文章
List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();
//2.批量导入到es索引库
BulkRequest bulkRequest = new BulkRequest("app_info_article");
for (SearchArticleVo searchArticleVo : searchArticleVos) {
IndexRequest indexRequest = new IndexRequest().id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(searchArticleVo), XContentType.JSON);
//批量添加对象
bulkRequest.add(indexRequest);
}
BulkResponse response = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println("插入结果:" + response.status());
}
}
ArticleSearchServiceImpl
@Service
@Slf4j
public class ArticleSearchServiceImpl implements ArticleSearchService {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public ResponseResult search(UserSearchDto dto) throws IOException {
//1.检查参数
if(dto == null || StringUtils.isBlank(dto.getSearchWords())){
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//2.构建查询执行查询
SearchRequest searchRequest = new SearchRequest("app_info_article");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//布尔查询(条件不只有一个)
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//关键词的分词之后查询
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(dto.getSearchWords())
.field("title").field("content").defaultOperator(Operator.OR);
boolQueryBuilder.must(queryStringQueryBuilder);
//查询小于mindate的数据
if(dto.getMinBehotTime()!=null){
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("publishTime")
.lt(dto.getMinBehotTime().getTime());
boolQueryBuilder.filter(rangeQueryBuilder);
}
//分页查询
searchSourceBuilder.from(0);
searchSourceBuilder.size(dto.getPageSize());
//按照发布时间倒序查询
searchSourceBuilder.sort("publishTime", SortOrder.DESC);
//设置高亮 title
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.preTags("<font style='color: red; font-size: inherit;'>");
highlightBuilder.postTags("</font>");
searchSourceBuilder.highlighter(highlightBuilder);
searchSourceBuilder.query(boolQueryBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//3.结果封装返回
List<Map> list = new ArrayList<>();
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
String json = hit.getSourceAsString();
Map map = JSON.parseObject(json, Map.class);
//处理高亮
if(hit.getHighlightFields() != null && hit.getHighlightFields().size() > 0){
Text[] titles = hit.getHighlightFields().get("title").getFragments();
String title = StringUtils.join(titles);
//高亮标题
map.put("h_title",title);
}else{
//原始标题
map.put("h_title",map.get("title"));
}
list.add(map);
}
return ResponseResult.okResult(list);
}
}
MongoDB安装及集成
4.3.1)安装MongoDB
拉取镜像
docker pull mongo
创建容器
docker run -di --name mongo-service --restart=always -p 27017:27017 -v ~/data/mongodata:/data mongo
4.3.2)导入资料中的mongo-demo项目到heima-leadnews-test中
其中有三项配置比较关键:
第一:mongo依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
第二:mongo配置
server:
port: 9998
spring:
data:
mongodb:
host: 192.168.200.130
port: 27017
database: leadnews-history
第三:映射
package com.itheima.mongo.pojo;
import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import java.io.Serializable;
import java.util.Date;
/**
* <p>
* 联想词表
* </p>
*
* @author itheima
*/
@Data
@Document("ap_associate_words")
public class ApAssociateWords implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
/**
* 联想词
*/
private String associateWords;
/**
* 创建时间
*/
private Date createdTime;
}
@Async的作用就是异步处理任务
1.在方法上添加@Async,表示此方法是异步方法
2.在类上添加@Async,表示类中的所有方法都是异步方法
3.使用此注解的类,必须是Spring管理的类
4.需要在启动类或配置类中加入@EnableAsync注解,@Async才会生效;
创建xxljob-demo项目,导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
application.yml配置
server:
port: 8881
xxl:
job:
admin:
addresses: http://192.168.200.130:8888/xxl-job-admin
executor:
appname: xxl-job-executor-sample
port: 9999
通过业务取模的方式将多个业务分别交给不同的分片去执行
引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
package com.heima.kafka.sample;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* 流式处理
*/
public class KafkaStreamQuickStart {
public static void main(String[] args) {
//kafka配置
Properties prop = new Properties();
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.152:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-sample");
//stream构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
//创建kafkaStream对象
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
//开启kafka流式计算
kafkaStreams.start();
}
/**
* 流式计算
* 消息的内容:hello kafka
* @param streamsBuilder
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
//创建kstream对象,同时指定从哪个topic中接收消息
KStream<String,String> stream = streamsBuilder.stream("itcast-topic-input");
//处理消息的value
stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
}) //按照value进行聚合处理
.groupBy((key,value)->value)
//聚合计算时间间隔
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
//聚合查询:求单词总个数
.count()
//转成KStream
.toStream()
//处理后结果key和value转成string
.map((key,value)->{
System.out.println("key:"+key+",value:"+value);
return new KeyValue<>(key.key().toString(),value.toString());
})
//发送消息
.to("itcast-topic-out");
}
}
遇到问题:Kafka的consumer,producer或kafkaStream不起作用
原因:consumer,producer或kafkaStream虽然启动了但是没有注册到kafka。
解决方案:可能需要在服务器中重启甚至重新配置zookeeper和kafka,然后查看kafka的日志,启动consumer,producer或kafkaStream是否会实时打印出对应的注册信息。
heima-leadnews-behavior->ApLikesBehaviorServiceImpl
package com.heima.behavior.service.impl;
import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {
@Autowired
private CacheService cacheService;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public ResponseResult like(LikesBehaviorDto dto) {
//1.检查参数
if (dto == null || dto.getArticleId() == null || checkParam(dto)) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
}
//2.是否登录
ApUser user = AppThreadLocalUtil.getUser();
if (user == null) {
return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);
}
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(dto.getArticleId());
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
//3.点赞 保存数据
if (dto.getOperation() == 0) {
Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
if (obj != null) {
return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");
}
// 保存当前key
log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);
cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));
mess.setAdd(1);
} else {
// 删除当前key
log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());
cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());
mess.setAdd(-1);
}
//发送消息,数据聚合
kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
}
/**
* 检查参数
*
* @return
*/
private boolean checkParam(LikesBehaviorDto dto) {
if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {
return true;
}
return false;
}
}
heima-leadnews-article->config
package com.heima.article.config;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
heima-leadnews-article->HotArticleStreamHandler
package com.heima.article.stream;
import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.time.Duration;
@Configuration
@Slf4j
public class HotArticleStreamHandler {
@Bean
public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
//接收消息
KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
//聚合流式处理
stream.map((key,value)->{
UpdateArticleMess mess = JSON.parseObject(value,UpdateArticleMess.class);
//重置消息的key:文章id和value:行为类型:数量 LIKE:0
return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
})
//按照文章id进行聚合
.groupBy((key,value)->key)
//时间窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
/**
* 自行的完成聚合的计算
*/
.aggregate(new Initializer<String>() {
/**
* 初始方法,返回值是消息的value
* @return
*/
@Override
public String apply() {
return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
}
/**
* 真正的聚合操作,返回值是消息的value
*/
}, new Aggregator<String, String, String>() {
@Override
public String apply(String key, String value, String aggValue) {
if(StringUtils.isBlank(value)){
return aggValue;
}
String[] aggAry = aggValue.split(",");
int col = 0,com = 0,lik = 0,vie = 0;
for (String agg : aggAry) {
String[] split = agg.split(":");
/**
* 获得初始值,也是时间窗口内计算之后的值
*/
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
col = Integer.parseInt(split[1]);
break;
case COMMENT:
com = Integer.parseInt(split[1]);
break;
case LIKES:
lik = Integer.parseInt(split[1]);
break;
case VIEWS:
vie = Integer.parseInt(split[1]);
break;
}
}
/**
* 累加操作
*/
String[] valAry = value.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){
case COLLECTION:
col += Integer.parseInt(valAry[1]);
break;
case COMMENT:
com += Integer.parseInt(valAry[1]);
break;
case LIKES:
lik += Integer.parseInt(valAry[1]);
break;
case VIEWS:
vie += Integer.parseInt(valAry[1]);
break;
}
String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d",col,com,lik,vie);
System.out.println("文章的id:"+key);
System.out.println("当前时间窗口内的消息处理结果:"+formatStr);
return formatStr;
}//当前流式处理的状态,如果有多个流式处理,保证不一样即可
}, Materialized.as("hot-article-stream-count-001"))
.toStream()
.map((key,value)->{
return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
})
//发送消息
.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
return stream;
}
/**
* 格式化消息的value数据
* @param articleId
* @param value
* @return
*/
private String formatObj(String articleId, String value) {
ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
mess.setArticleId(Long.valueOf(articleId));
//COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d
String[] valAry = value.split(",");
for (String val : valAry) {
String[] split = val.split(":");
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){
case COLLECTION:
mess.setCollect(Integer.parseInt(split[1]));
break;
case COMMENT:
mess.setComment(Integer.parseInt(split[1]));
break;
case LIKES:
mess.setLike(Integer.parseInt(split[1]));
break;
case VIEWS:
mess.setView(Integer.parseInt(split[1]));
break;
}
}
log.info("聚合消息处理之后和结果为:{}",JSON.toJSONString(mess));
return JSON.toJSONString(mess);
}
}
heima-leadnews-article->ArticleIncrHandleListener
@Component
@Slf4j
public class ArticleIncrHandleListener {
@Autowired
private ApArticleService apArticleService;
@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC )
public void onMessage(String mess){
if(StringUtils.isNotBlank(mess)){
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
apArticleService.updateScore(articleVisitStreamMess);
}
}
}
遇到问题:将文章ApArticle列表数据返回给前端后,到达前端的ApArticle列表的文章id前后不一致
原因:ApArticle的id类型为long,而long类型数据在网络传输时会丢失精度,所以末尾出现0
解决方案:jackson进行序列化和反序列化解决
-
当后端响应给前端的数据中包含了id或者特殊标识(可自定义)的时候,把当前数据进行转换为String类型
-
当前端传递后后端的dto中有id或者特殊标识(可自定义)的时候,把当前数据转为Integer或Long类型。
特殊标识类说明:
IdEncrypt 自定义注解 作用在需要转换类型的字段属性上,用于非id的属性上 在model包下
package com.heima.model.common.annotation;
import com.fasterxml.jackson.annotation.JacksonAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@JacksonAnnotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER})
public @interface IdEncrypt {
}
序列化和反序列化类说明:以下类理解为主,可直接在资料文件夹下拷贝到leadnews-common模块中使用。
-
ConfusionSerializer 用于序列化自增数字的混淆
public class ConfusionSerializer extends JsonSerializer<Object> {
@Override
public void serialize(Object value, JsonGenerator jsonGenerator, SerializerProvider serializers) throws IOException {
try {
if (value != null) {
jsonGenerator.writeString(value.toString());
return;
}
}catch (Exception e){
e.printStackTrace();
}
serializers.defaultSerializeValue(value, jsonGenerator);
}
}
-
ConfusionDeserializer 用于反序列化自增数字的混淆解密
public class ConfusionDeserializer extends JsonDeserializer<Object> { JsonDeserializer<Object> deserializer = null; JavaType type =null; public ConfusionDeserializer(JsonDeserializer<Object> deserializer, JavaType type){ this.deserializer = deserializer; this.type = type; } @Override public Object deserialize(JsonParser p, DeserializationContext ctxt) throws IOException{ try { if(type!=null){ if(type.getTypeName().contains("Long")){ return Long.valueOf(p.getValueAsString()); } if(type.getTypeName().contains("Integer")){ return Integer.valueOf(p.getValueAsString()); } } return IdsUtils.decryptLong(p.getValueAsString()); }catch (Exception e){ if(deserializer!=null){ return deserializer.deserialize(p,ctxt); }else { return p.getCurrentValue(); } } } }
-
ConfusionSerializerModifier 用于过滤序列化时处理的字段
public class ConfusionSerializerModifier extends BeanSerializerModifier {
@Override
public List<BeanPropertyWriter> changeProperties(SerializationConfig config,
BeanDescription beanDesc, List<BeanPropertyWriter> beanProperties) {
List<BeanPropertyWriter> newWriter = new ArrayList<>();
for(BeanPropertyWriter writer : beanProperties){
String name = writer.getType().getTypeName();
if(null == writer.getAnnotation(IdEncrypt.class) && !writer.getName().equalsIgnoreCase("id")){
newWriter.add(writer);
} else {
writer.assignSerializer(new ConfusionSerializer());
newWriter.add(writer);
}
}
return newWriter;
}
}
-
ConfusionDeserializerModifier 用于过滤反序列化时处理的字段
public class ConfusionDeserializerModifier extends BeanDeserializerModifier { @Override public BeanDeserializerBuilder updateBuilder(final DeserializationConfig config, final BeanDescription beanDescription, final BeanDeserializerBuilder builder) { Iterator it = builder.getProperties(); while (it.hasNext()) { SettableBeanProperty p = (SettableBeanProperty) it.next(); if ((null != p.getAnnotation(IdEncrypt.class)||p.getName().equalsIgnoreCase("id"))) { JsonDeserializer<Object> current = p.getValueDeserializer(); builder.addOrReplaceProperty(p.withValueDeserializer(new ConfusionDeserializer(p.getValueDeserializer(),p.getType())), true); } } return builder; } }
-
ConfusionModule 用于注册模块和修改器
public class ConfusionModule extends Module { public final static String MODULE_NAME = "jackson-confusion-encryption"; public final static Version VERSION = new Version(1,0,0,null,"heima",MODULE_NAME); @Override public String getModuleName() { return MODULE_NAME; } @Override public Version version() { return VERSION; } @Override public void setupModule(SetupContext context) { context.addBeanSerializerModifier(new ConfusionSerializerModifier()); context.addBeanDeserializerModifier(new ConfusionDeserializerModifier()); } /** * 注册当前模块 * @return */ public static ObjectMapper registerModule(ObjectMapper objectMapper){ //CamelCase策略,Java对象属性:personId,序列化后属性:persionId //PascalCase策略,Java对象属性:personId,序列化后属性:PersonId //SnakeCase策略,Java对象属性:personId,序列化后属性:person_id //KebabCase策略,Java对象属性:personId,序列化后属性:person-id // 忽略多余字段,抛错 objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return objectMapper.registerModule(new ConfusionModule()); } }
-
InitJacksonConfig 提供自动化配置默认ObjectMapper,让整个框架自动处理id混淆
@Configuration public class InitJacksonConfig { @Bean public ObjectMapper objectMapper() { ObjectMapper objectMapper = new ObjectMapper(); objectMapper = ConfusionModule.registerModule(objectMapper); return objectMapper; } }
在common模块中的自动配置的spring.factories中添加如下内容
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.heima.common.swagger.SwaggerConfiguration,\
com.heima.common.swagger.Swagger2Configuration,\
com.heima.common.exception.ExceptionCatch,\
com.heima.common.aliyun.GreenTextScan,\
com.heima.common.aliyun.GreenImageScan,\
com.heima.common.jackson.InitJacksonConfig
在dto中传递参数的时候如果想要把数值类型转为json,可以使用@IdEncrypt
标识字段进行转换,如下:
@Data
public class ArticleInfoDto {
// 文章ID
@IdEncrypt
Long articleId;
}
Jenkins介绍
Jenkins 是一款流行的开源持续集成(Continuous Integration)工具,广泛用于项目开发,具有自动化构建、测试和部署等功能。官网: Jenkins。
Jenkins的特征:
-
开源的 Java语言开发持续集成工具,支持持续集成,持续部署。
-
易于安装部署配置:可通过 yum安装,或下载war包以及通过docker容器等快速实现安装部署,可方便web界面配置管理。
-
消息通知及测试报告:集成 RSS/E-mail通过RSS发布构建结果或当构建完成时通过e-mail通知,生成JUnit/TestNG测试报告。
-
分布式构建:支持 Jenkins能够让多台计算机一起构建/测试。
-
文件识别: Jenkins能够跟踪哪次构建生成哪些jar,哪次构建使用哪个版本的jar等。
-
丰富的插件支持:支持扩展插件,你可以开发适合自己团队使用的工具,如 git,svn,maven,docker等。
Jenkins安装配置
- 首先服务器需要安装java环境(JDK11以上)
- 去Jenkins官网Jenkins下载Jenkins.war包
- 将jenkins.war包上传到服务器后通过Java命令运行jenkins.war包,启动jenkins服务,这里顺便指定了端口号为16060。
- 如果在Centos7 上Open JDK 11 安装启动Jenkins的时候报了一个错误:
hudson.util.AWTProblem at hudson.WebAppMain.contextInitialized(WebAppMain.java:251)
- 解决方案:
yum install fontconfig
- 再次启动查看成功启动的日志,提醒管理员账户已经自动帮我们创建成功了,并生成了密钥。
- 浏览器首次访问Jenkins服务,就需要将这个密钥进行输入才能登录
按默认设置,把建议的插件都安装上
这一步等待时间较长, 安装完成之后, 创建管理员用户:
配置访问地址:
配置完成之后, 会进行重启, 之后可以看到管理后台:
Jenkins工具配置
1.进入【系统管理】--> 【全局工具配置】
2.MAVEN配置全局设置
3. 指定JDK配置
4.指定MAVEN 目录
5.指定DOCKER目录
如果不清楚docker的安装的目录,可以使用whereis docker
命令查看docker的安装的目录
每个微服务都引入该依赖,以heima-leadnews-user微服务为例
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>heima-leadnews-service</artifactId>
<groupId>com.heima</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>heima-leadnews-user</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<docker.image>docker_storage</docker.image>
</properties>
<build>
<finalName>heima-leadnews-user</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>dockerfile-maven-plugin</artifactId>
<version>1.3.6</version>
<configuration>
<repository>${docker.image}/${project.artifactId}</repository>
<buildArgs>
<JAR_FILE>target/${project.build.finalName}.jar</JAR_FILE>
</buildArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
服务集成Dockerfile文件
# 设置JAVA版本
FROM java:8
# 指定存储卷, 任何向/tmp写入的信息都不会记录到容器存储层
VOLUME /tmp
# 拷贝运行JAR包
ARG JAR_FILE
COPY ${JAR_FILE} app.jar
# 设置JVM运行参数, 这里限定下内存大小,减少开销
ENV JAVA_OPTS="\
-server \
-Xms256m \
-Xmx512m \
-XX:MetaspaceSize=256m \
-XX:MaxMetaspaceSize=512m"
#空参数,方便创建容器时传参
ENV PARAMS=""
# 入口点, 执行JAVA运行命令
ENTRYPOINT ["sh","-c","java -jar $JAVA_OPTS /app.jar $PARAMS"]
利用Jenkins编译打包时可能会因为镜像源失效而导致拉取镜像不成功的编译失败问题,更新 Dockerfile 中的基础镜像,将 FROM java:8
中的 java:8
替换为所需的镜像版本。例如,如果要使用 JDK 8,可以将其更改为 FROM openjdk:8-jdk
。
遇到问题:Jenkins项目编译不成功。
原因:在Jenkins服务器上,Jenkins通过maven编译打包到服务器本地仓库的依赖会有可能损坏或不完整,就算删除重新编译打包也会有相同的问题。
解决方案:将本地maven仓库对应的依赖包替换掉Jenkins服务器上maven本地仓库的可能损坏或不完整的依赖包,甚至替换掉依赖相关的整个文件夹。因为本地maven仓库的项目测试是没有问题的。
clean install -Dmaven.test.skip=true dockerfile:build -f heima-leadnews/heima-leadnews-service/heima-leadnews-user/pom.xml
if [ -n "$(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )" ]
then
#删除之前的容器
docker rm -f $(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )
fi
# 清理镜像
docker image prune -f
# 启动docker服务
docker run -d --net=host -e PARAMS="--spring.profiles.active=prod" --name $JOB_NAME docker_storage/$JOB_NAME
构建镜像
清理容器,创建新的容器
可以发现服务已经通过docker容器的方式启动
安装配置私有仓库
对于持续集成环境的配置,Jenkins会发布大量的微服务, 要与多台机器进行交互, 可以采用docker镜像的保存与导出功能结合SSH实现, 但这样交互繁琐,稳定性差, 而且不便管理, 这里我们通过搭建Docker的私有仓库来实现, 这个有点类似GIT仓库, 集中统一管理资源, 由客户端拉取或更新。
-
下载最新Registry镜像
docker pull registry:latest
-
启动Registry镜像服务
docker run -d -p 5000:5000 --name registry -v /usr/local/docker/registry:/var/lib/registry registry:latest
映射5000端口; -v是将Registry内的镜像数据卷与本地文件关联, 便于管理和维护Registry内的数据。
-
查看仓库资源
访问地址:http://192.168.200.100:5000/v2/_catalog
启动正常, 可以看到返回:
{"repositories":[]}
目前并没有上传镜像, 显示空数据。
如果上传成功, 可以看到数据:
jenkins中安装插件
jenkins系统配置远程服务器链接
位置:Manage Jenkins-->Configure System
需要添加凭证
位置:Manage Jenkins-->Manage CreDentials
添加链接到130服务器的用户名和密码
maven命令
clean install -Dmaven.test.skip=true dockerfile:build -f heima-leadnews/heima-leadnews-service/heima-leadnews-article/pom.xml
shell脚本
image_tag=$docker_registry/docker_storage/$JOB_NAME
echo '================docker镜像清理================'
if [ -n "$(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )" ]
then
#删除之前的容器
docker rm -f $(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )
fi
# 清理镜像
docker image prune -f
# 创建TAG
docker tag docker_storage/$JOB_NAME $image_tag
echo '================docker镜像推送================'
# 推送镜像
docker push $image_tag
# 删除TAG
docker rmi $image_tag
echo '================docker tag 清理 ================'
远程服务器执行的shell脚本
echo '================拉取最新镜像================'
docker pull $docker_registry/docker_storage/$JOB_NAME
echo '================删除清理容器镜像================'
if [ -n "$(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )" ]
then
#删除之前的容器
docker rm -f $(docker ps -a -f name=$JOB_NAME --format '{{.ID}}' )
fi
# 清理镜像
docker image prune -f
echo '===============启动容器================'
docker run -d --net=host -e PARAMS="--spring.profiles.active=prod" --name $JOB_NAME $docker_registry/docker_storage/$JOB_NAME
遇到问题:镜像推送dockers仓库失败
received unexpected HTTP status: 500 Internal Server Error
Build step 'Execute shell' marked build as failure
Finished: FAILURE
原因:是由于selinux未关闭导致docker出现异常情况
解决方案:
关闭selinux
临时关闭
[root@ip-10-0-1-46 ~]# setenforce 0
或永久关闭
[root@ip-10-0-1-46 ~]# sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config
重新push
遇到问题:SSH拉取docker仓库镜像失败
echo '===============启动容器================'
docker run -d --net=host -e PARAMS="--spring.profiles.active=prod" --name $JOB_NAME $docker_registry/docker_storage/$JOB_NAME
[SSH] executing...
Error response from daemon: Get "https://192.168.136.153:5000/v2/": http: server gave HTTP response to HTTPS client
Unable to find image '192.168.136.153:5000/docker_storage/heima-leadnews-article:latest' locally
docker: Error response from daemon: Get "https://192.168.136.153:5000/v2/": http: server gave HTTP response to HTTPS client.
See 'docker run --help'.
================拉取最新镜像================
Using default tag: latest
================删除清理容器镜像================
Total reclaimed space: 0B
===============启动容器================
[SSH] completed
[SSH] exit-status: 125
Build step 'Execute shell script on remote host using ssh' marked build as failure
Finished: FAILURE
原因: 拉取端服务器的docker没有配置指向安装Registry的服务IP与端口,也就是说,不管是推送服务器的docker或者拉取服务器的docker都需要配置指向安装Registry的服务IP与端口。
解决方案:
先确保持续集成环境的机器已安装好Docker客户端, 然后做以下修改:
vi /lib/systemd/system/docker.service
修改内容:
ExecStart=/usr/bin/dockerd --insecure-registry=192.168.136.153:5000
指向安装Registry的服务IP与端口。
重启生效:
systemctl daemon-reload
systemctl restart docker.service
遇到问题:
javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)
at sun.security.ssl.HandshakeContext.<init>(HandshakeContext.java:171) ~[na:1.8.0_292]
at sun.security.ssl.ClientHandshakeContext.<init>(ClientHandshakeContext.java:98) ~[na:1.8.0_292]
at sun.security.ssl.TransportContext.kickstart(TransportContext.java:220) ~[na:1.8.0_292]
at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:428) ~[na:1.8.0_292]
at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:316) ~[mysql-connector-java-8.0.17.jar:8.0.17]
at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:188) ~[mysql-connector-java-8.0.17.jar:8.0.17]
at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:99) ~[mysql-connector-java-8.0.17.jar:8.0.17]
at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:331) ~[mysql-connector-java-8.0.17.jar:8.0.17]
... 68 common frames omitted
原因:在Java8及高版本以上的版本在调用ssl时会出现javax.net.ssl.SSLHandshakeException: No appropriate protocol
的异常。
解决方案一:
jdbc-url中添加配置useSSL=false
spring:
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.136.152:3306/leadnews_article?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false
解决方案二:
Java\jre里面的lib\security 下面有个java.security。找到对应的SSLv3,删除掉,重启项目就好了。(删掉SSLv3就是允许SSL调用)
然后重启java服务
文章来源:https://www.toymoban.com/news/detail-432443.html
文章来源地址https://www.toymoban.com/news/detail-432443.html
到了这里,关于新黑马头条项目经验(黑马)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!