最近的项目中有实时对话的需求。这里也是查阅了很多的资料。使用webSocket实现一对一对话是最多的。
链接: https://pan.baidu.com/s/1Vn1e1qw7nRnU1_4R-4fcGg
提取码: qwer
逻辑讲解:
现在我们要给张三丰发一个你好,我们要解决下面两个问题
· 这个你好怎么展示在我的窗口
· 这个你好怎么展示在张三丰的窗口
第一个问题很好解决,我们的消息都是存在一个数组里面的,而我们前端是使用vue实现的,我们只需要在发消息的时候把新的消息push到这个数组里面去就行了。
我们要推送消息,我们只要知道要推送的session就好了,我们可以在连接的时候,把每一个session装进Map里面去,key使用userId。
前端拿到数据后,在websocketonmessage里面push到数组里面去就好了。
这样就实现了两个用户实时的聊天了。
当前会话存在就显示消息,当前会话不存在就显示消息+1
我们知道一个人可能和多个用户聊天,如果当前消息是该会话用户发的,那么我们的消息要显示出来(上面说了直接push到list里面),如果不是当前会话,我们就要显示消息+1。
引入依赖:此依赖封装了对实时对话的创建到取消链接
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
项目所需要的表:
消息表
用来存每一个的对话消息,通过当前用户id,和点击会话列表,我们就和查询到当前会话的消息了。
CREATE TABLE `msg_info` (
`id` int(6) NOT NULL AUTO_INCREMENT COMMENT '消息id',
`from_user_id` int(6) NOT NULL COMMENT '消息发送者id',
`from_user_name` varchar(50) NOT NULL COMMENT '消息发送者名称',
`to_user_id` int(6) NOT NULL COMMENT '消息接收者id',
`to_user_name` varchar(50) NOT NULL COMMENT '消息接收者名称',
`content` varchar(200) NOT NULL COMMENT '消息内容',
`create_time` datetime NOT NULL COMMENT '消息发送时间',
`un_read_flag` int(1) NOT NULL COMMENT '是否已读(1 已读)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=273 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='消息表';
会话列表
A和B、A和C、A和D、B和A,都可以建立会话,我们需要把存在的会话都存在起来,这样用户一登陆就可以看到自己之前的会话列表了。
CREATE TABLE `session_list` (
`id` int(6) NOT NULL AUTO_INCREMENT COMMENT 'id',
`user_id` int(6) NOT NULL COMMENT '所属用户',
`to_user_id` int(6) NOT NULL COMMENT '到用户',
`list_name` varchar(50) NOT NULL COMMENT '会话名称',
`un_read_count` int(6) NOT NULL COMMENT '未读消息数',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT COMMENT='会话列表';
用户表(一般只需要id和用户名建立通话即可,其他的如有需求自行添加)
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '用户名',
`password` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '密码',
`nick_name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '昵称',
`age` int(11) DEFAULT NULL COMMENT '年龄',
`sex` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '性别',
`address` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '地址',
`avatar` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '头像',
`account` decimal(10,2) DEFAULT 0.00 COMMENT '账户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=DYNAMIC COMMENT='用户信息表';
配置WebSocketConfig
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
配置跨域
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter;
@Configuration
public class CorsConfig {
// 当前跨域请求最大有效时长。这里默认1天
private static final long MAX_AGE = 24 * 60 * 60;
private CorsConfiguration buildConfig() {
CorsConfiguration corsConfiguration = new CorsConfiguration();
corsConfiguration.addAllowedOrigin("*"); // 1 设置访问源地址
corsConfiguration.addAllowedHeader("*"); // 2 设置访问源请求头
corsConfiguration.addAllowedMethod("*"); // 3 设置访问源请求方法
corsConfiguration.setMaxAge(MAX_AGE);
return corsConfiguration;
}
@Bean
public CorsFilter corsFilter() {
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", buildConfig()); // 4 对接口配置跨域设置
return new CorsFilter(source);
}
}
mybatis-plus配置 分页插件
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* mybatis-plus 分页插件
*/
@Configuration
@MapperScan("com.jt.mapper")
public class MybatisPlusConfig {
/**
* 分页插件
*/
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
}
Util工具类
CurPool 统一管理session、websocket、curUser(存储当前登录的用户)
package com.jt.util;
import com.jt.pojo.User;
import com.jt.websocket.WebSocket;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 统一管理session、websocket、curUser
*/
public class CurPool {
// public static CopyOnWriteArraySet<WebSocket> webSockets =new CopyOnWriteArraySet<>();
public static Map<Integer, WebSocket> webSockets = new ConcurrentHashMap<>();
// list 里面第一个存sessionId,第二个存session
public static Map<Integer, List<Object>> sessionPool = new ConcurrentHashMap<>();
// 当前登录用户x
public static Map<String, User> curUserPool = new ConcurrentHashMap<>();
}
JsonUtils JSON转换工具类,其实可以用其他的
例如:alibba的JSON gson的JSON
package com.jt.util;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* json 相关格式化
*/
public class JsonUtils {
// 定义jackson对象
private static ObjectMapper MAPPER = new ObjectMapper();
{
MAPPER.setSerializationInclusion(Include.ALWAYS);
}
/**
* 将对象转换成json字符串。
* <p>Title: pojoToJson</p >
* <p>Description: </p >
* @param data
* @return
*/
public static String objectToJson(Object data) {
try {
String string = MAPPER.writeValueAsString(data);
return string;
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public static JsonNode stringToJsonNode(String data) {
try {
JsonNode jsonNode = MAPPER.readTree(data);
return jsonNode;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
/**
* 将json结果集转化为对象
*
* @param jsonData json数据
* @param beanType 对象中的object类型
* @return
* @throws Exception
*/
public static <T> T objectToPojo(Object jsonData, Class<T> beanType) throws Exception {
T t = MAPPER.readValue(MAPPER.writeValueAsString(jsonData), beanType);
return t;
}
/**
* 将json结果集转化为对象
*
* @param jsonData json数据
* @param beanType 对象中的object类型
* @return
* @throws Exception
*/
public static <T> T jsonToPojo(String jsonData, Class<T> beanType) throws Exception {
T t = MAPPER.readValue(jsonData, beanType);
return t;
}
/**
* 将json数据转换成pojo对象list
* <p>Title: jsonToList</p >
* <p>Description: </p >
* @param jsonData
* @param beanType
* @return
*/
public static <T>List<T> jsonToList(String jsonData, Class<T> beanType) {
JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
try {
List<T> list = MAPPER.readValue(jsonData, javaType);
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 将object数据转换成pojo对象list
* <p>Title: jsonToList</p >
* <p>Description: </p >
* @param jsonData
* @param beanType
* @return
*/
public static <T>List<T> objectToList(Object jsonData, Class<T> beanType) {
JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, beanType);
try {
List<T> list = MAPPER.readValue(MAPPER.writeValueAsString(jsonData), javaType);
return list;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 将JSON数据转换成Map
* @param jsonData
* @return
*/
public static Map<String,Object> jsonToMap(String jsonData) {
try {
Map map = MAPPER.readValue(jsonData, Map.class);
return map;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
SpringContextUtil 重要:获取spring中的bean对象工具类,因为websocket中无法直接使用mapper
package com.jt.util;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
/**
* 获取spring中的bean对象工具类
*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
/**
* Spring应用上下文环境
*/
private static ApplicationContext applicationContext;
/**
* 实现ApplicationContextAware接口的回调方法,设置上下文环境
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 获取对象 这里重写了bean方法,起主要作用
*/
public static Object getBean(String beanId) throws BeansException {
return applicationContext.getBean(beanId);
}
}
Controller
MsgInfoController 获取消息列表
package com.jt.controller;
import com.jt.mapper.MsgInfoMapper;
import com.jt.mapper.SessionListMapper;
import com.jt.pojo.MsgInfo;
import com.jt.pojo.SessionList;
import com.jt.vo.SysResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
public class MsgInfoController {
@Resource
private MsgInfoMapper msgInfoMapper;
@Autowired
private SessionListMapper sessionListMapper;
// 消息列表
@GetMapping("/msgList")
public SysResult msgList(@RequestParam Integer sessionId){
SessionList sessionList = sessionListMapper.selectByPrimaryKey(sessionId);
if(sessionList == null){
return SysResult.success();
}
Integer fromUserId = sessionList.getUserId();
Integer toUserId = sessionList.getToUserId();
List<MsgInfo> msgInfoList = msgInfoMapper.selectMsgList(fromUserId,toUserId);
// 更新消息已读
msgInfoMapper.msgRead(fromUserId, toUserId);
// 更新会话里面的未读消息
sessionListMapper.delUnReadCount(fromUserId, toUserId);
return SysResult.success(msgInfoList);
}
}
SessionController 创建会话、查询可建立的会话、查询已建立的会话
package com.jt.controller;
import com.jt.mapper.SessionListMapper;
import com.jt.mapper.UserMapper;
import com.jt.pojo.SessionList;
import com.jt.pojo.User;
import com.jt.vo.SysResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
public class SessionController {
@Autowired
private UserMapper userMapper;
@Autowired
private SessionListMapper seesionListMapper;
/**
* 已建立会话
*/
@GetMapping("/sessionList/already")
public SysResult sessionListAlready(@RequestParam Integer id){
List<SessionList> sessionLists = seesionListMapper.selectByUserId(id);
return SysResult.success(sessionLists);
}
// 可建立会话的用户
@GetMapping("/sessionList/not")
public SysResult sessionListNot(@RequestParam Integer id){
List<Integer> list = seesionListMapper.selectUserIdByUserId(id);
list.add(id);
List<User> cloudList = userMapper.getCloudList(list);
return SysResult.success(cloudList);
}
// 创建会话
@GetMapping("/createSession")
public SysResult createSession(@RequestParam Integer id,@RequestParam Integer toUserId,@RequestParam String toUserName){
SessionList sessionList = new SessionList();
sessionList.setUserId(id);
sessionList.setUnReadCount(0);
sessionList.setListName(toUserName);
sessionList.setToUserId(toUserId);
seesionListMapper.insert(sessionList);
// 判断对方和我建立会话没有? 没有也要建立
Integer integer = seesionListMapper.selectIdByUser(toUserId, id);
if (integer == null || integer <= 0){
User user = userMapper.selectByPrimaryKey(id);
sessionList.setUserId(toUserId);
sessionList.setToUserId(id);
sessionList.setListName(user.getName());
seesionListMapper.insert(sessionList);
}
return SysResult.success();
}
// 删除会话
@GetMapping("/delSession")
public SysResult delSession(@RequestParam Integer sessionId){
seesionListMapper.deleteByPrimaryKey(sessionId);
return SysResult.success();
}
}
UserController 用户注册、登录(登录时将用户存入池中)、注销(用户退出时,在池中清除)
package com.jt.controller;
import com.jt.mapper.UserMapper;
import com.jt.pojo.User;
import com.jt.util.CurPool;
import com.jt.vo.SysResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class UserController {
@Autowired
private UserMapper userMapper;
// 注册用户
@GetMapping("/register")
public SysResult register(@RequestParam String name) {
String s = userMapper.selectByName(name);
if (s != null || "".equals(s)){
return SysResult.fail().setMsg("改用户已存在");
}
User user = new User();
user.setName(name);
userMapper.insert(user);
return SysResult.success();
}
// 登录
@GetMapping("/login")
public SysResult login(@RequestParam String name){
if (CurPool.curUserPool.get(name) != null){
return SysResult.fail().setMsg("该用户已登录");
}
User user = userMapper.selectUserByName(name);
if (user == null || user.getId() == null){
return SysResult.fail().setMsg("该用户不存在");
}
CurPool.curUserPool.put(user.getName(), user);
return SysResult.success(user);
}
// 注销
@GetMapping("/loginOut")
public SysResult loginOut(@RequestParam String name){
if (name != null && !"".equals(name)){
CurPool.curUserPool.remove(name);
User user = userMapper.selectUserByName(name);
CurPool.sessionPool.remove(user.getId());
CurPool.webSockets.remove(user.getId());
System.out.println("【websocket消息】连接断开,总数为:"+CurPool.webSockets.size());
}
return SysResult.success();
}
}
Mapper
MsgInfoMapper
package com.jt.mapper;
import com.jt.pojo.MsgInfo;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface MsgInfoMapper {
int insert(MsgInfo msgInfo);
void msgRead(@Param("fromUserId") Integer fromUserId, @Param("toUserId") Integer toUserId);
List<MsgInfo> selectMsgList(@Param("fromUserId") Integer fromUserId, @Param("toUserId") Integer toUserId);
}
MsgInfoMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jt.mapper.MsgInfoMapper">
<resultMap id="BaseResultMap" type="com.jt.pojo.MsgInfo">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="from_user_id" jdbcType="INTEGER" property="fromUserId" />
<result column="from_user_name" jdbcType="VARCHAR" property="fromUserName" />
<result column="to_user_id" jdbcType="INTEGER" property="toUserId" />
<result column="to_user_name" jdbcType="VARCHAR" property="toUserName" />
<result column="content" jdbcType="VARCHAR" property="content" />
<result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
<result column="un_read_flag" jdbcType="INTEGER" property="unReadFlag" />
</resultMap>
<sql id="Base_Column_List">
id, from_user_id, from_user_name, to_user_id,to_user_name, content, create_time, un_read_flag
</sql>
<insert id="insert" parameterType="com.jt.pojo.MsgInfo" useGeneratedKeys="true" keyProperty="id">
insert into msg_info (id, from_user_id, from_user_name,
to_user_id, to_user_name, content, create_time,
un_read_flag)
values (#{id,jdbcType=INTEGER}, #{fromUserId,jdbcType=INTEGER}, #{fromUserName,jdbcType=VARCHAR},
#{toUserId,jdbcType=INTEGER},#{toUserName,jdbcType=VARCHAR}, #{content,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP},
#{unReadFlag,jdbcType=INTEGER})
</insert>
<update id="msgRead">
UPDATE msg_info SET un_read_flag = 1 WHERE from_user_id = #{fromUserId} AND to_user_id = #{toUserId}
</update>
<select id="selectMsgList" resultMap="BaseResultMap">
SELECT * FROM msg_info
WHERE (
(to_user_id = #{toUserId} AND from_user_id = #{fromUserId})
OR (to_user_id = #{fromUserId} AND from_user_id = #{toUserId})
)
</select>
</mapper>
SessionListMapper
package com.jt.mapper;
import com.jt.pojo.SessionList;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface SessionListMapper {
int deleteByPrimaryKey(Integer id);
int insert(SessionList sessionList);
SessionList selectByPrimaryKey(Integer id);
List<Integer> selectUserIdByUserId(Integer id);
List<SessionList> selectByUserId(Integer id);
Integer selectIdByUser(@Param("fromId") Integer fromId,@Param("toId") Integer toId);
void addUnReadCount(@Param("userId") Integer userId,@Param("toUserId") Integer toUserId);
void delUnReadCount(@Param("fromUserId") Integer fromUserId,@Param("toUserId") Integer toUserId);
}
SessionListMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jt.mapper.SessionListMapper">
<resultMap id="BaseResultMap" type="com.jt.pojo.SessionList">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="user_id" jdbcType="INTEGER" property="userId" />
<result column="to_user_id" jdbcType="INTEGER" property="toUserId" />
<result column="list_name" jdbcType="VARCHAR" property="listName" />
<result column="un_read_count" jdbcType="INTEGER" property="unReadCount" />
</resultMap>
<sql id="Base_Column_List">
id, user_id,to_user_id, list_name, un_read_count
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from session_list
where id = #{id,jdbcType=INTEGER}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
delete from session_list
where id = #{id,jdbcType=INTEGER}
</delete>
<insert id="insert" parameterType="com.jt.pojo.SessionList">
insert into session_list (id, user_id, to_user_id,list_name,
un_read_count)
values (#{id,jdbcType=INTEGER}, #{userId,jdbcType=INTEGER}, #{toUserId,jdbcType=INTEGER}, #{listName,jdbcType=VARCHAR},
#{unReadCount,jdbcType=INTEGER})
</insert>
<select id="selectUserIdByUserId" resultType="java.lang.Integer">
SELECT to_user_id FROM session_list WHERE user_id = #{id}
</select>
<select id="selectByUserId" resultMap="BaseResultMap">
SELECT * FROM session_list WHERE user_id = #{id}
</select>
<select id="selectIdByUser" resultType="java.lang.Integer">
SELECT id FROM session_list WHERE user_id = #{fromId} AND to_user_id = #{toId}
</select>
<update id="addUnReadCount">
UPDATE session_list SET un_read_count = un_read_count + 1 WHERE user_id = #{userId} AND to_user_id = #{toUserId}
</update>
<update id="delUnReadCount">
UPDATE session_list SET un_read_count = 0 WHERE user_id = #{fromUserId} AND to_user_id = #{toUserId}
</update>
</mapper>
UserMapper
package com.jt.mapper;
import cn.hutool.db.Page;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jt.pojo.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface UserMapper {
int insert(User user);
User selectByPrimaryKey(Integer id);
String selectByName(String name);
List<User> getCloudList(@Param("list") List<Integer> list);
User selectUserByName(@Param("name") String name);
}
UserMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jt.mapper.UserMapper">
<resultMap id="BaseResultMap" type="com.jt.pojo.User">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="name" jdbcType="VARCHAR" property="name" />
</resultMap>
<sql id="Base_Column_List">
id, `name`
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from user
where id = #{id,jdbcType=INTEGER}
</select>
<insert id="insert" parameterType="com.jt.pojo.User">
insert into user (id, `name`)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR})
</insert>
<select id="selectByName" resultType="java.lang.String">
SELECT name FROM user WHERE name = #{name}
</select>
<select id="getCloudList" resultMap="BaseResultMap">
SELECT id,name FROM user
<if test="list != null">
WHERE id not in
<foreach collection="list" open="(" close=")" separator="," item="i">
#{i}
</foreach>
</if>
</select>
<select id="selectUserByName" resultMap="BaseResultMap" parameterType="java.lang.String">
SELECT id,name FROM user WHERE name = #{name}
</select>
</mapper>
连接建立成功调用的方法 onOpen
连接关闭调用的方法 onClose
收到客户端消息后调用的方法 onMessage
发生错误时调用 onError
package com.jt.websocket;
import com.jt.mapper.MsgInfoMapper;
import com.jt.mapper.SessionListMapper;
import com.jt.mapper.UserMapper;
import com.jt.pojo.MsgInfo;
import com.jt.pojo.SessionList;
import com.jt.pojo.User;
import com.jt.util.CurPool;
import com.jt.util.JsonUtils;
import com.jt.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Slf4j
@Component
@ServerEndpoint("/websocket/{userId}/{sessionId}")
//此注解相当于设置访问URL
public class WebSocket {
@Autowired
private SessionListMapper sessionListMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private MsgInfoMapper msgInfoMapper;
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session,@PathParam(value="userId")Integer userId, @PathParam(value="sessionId")String sessionId) {
this.session = session;
CurPool.webSockets.put(userId,this);
List<Object> list = new ArrayList<>();
list.add(sessionId);
list.add(session);
CurPool.sessionPool.put(userId , list);
System.out.println("【websocket消息】有新的连接,总数为:"+CurPool.webSockets.size());
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
// 断开连接删除用户删除session
Integer userId = Integer.parseInt(this.session.getRequestParameterMap().get("userId").get(0));
CurPool.sessionPool.remove(userId);
CurPool.webSockets.remove(userId);
if (userMapper == null){
this.userMapper = (UserMapper) SpringContextUtil.getBean("userMapper");
}
User user = userMapper.selectByPrimaryKey(userId);
CurPool.curUserPool.remove(user.getName());
System.out.println("【websocket消息】连接断开,总数为:"+CurPool.webSockets.size());
}
/**
* 收到客户端消息后调用的方法
* 后台收到客户端发送过来的消息
* onMessage 是一个消息的中转站
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message) {
String sessionId = this.session.getRequestParameterMap().get("sessionId").get(0);
if (sessionId == null){
System.out.println("sessionId 错误");
}
// 在这里无法注入Mapper所以使用这种方式注入Mapper
if (sessionListMapper == null){
this.sessionListMapper = (SessionListMapper)SpringContextUtil.getBean("sessionListMapper");
}
if (userMapper == null){
this.userMapper = (UserMapper)SpringContextUtil.getBean("userMapper");
}
if (msgInfoMapper == null){
this.msgInfoMapper = (MsgInfoMapper)SpringContextUtil.getBean("msgInfoMapper");
}
SessionList sessionList = sessionListMapper.selectByPrimaryKey(Integer.parseInt(sessionId));
User user = userMapper.selectByPrimaryKey(sessionList.getUserId());
MsgInfo msgInfo = new MsgInfo();
msgInfo.setContent(message);
msgInfo.setCreateTime(new Date());
msgInfo.setFromUserId(sessionList.getUserId());
msgInfo.setFromUserName(user.getName());
msgInfo.setToUserId(sessionList.getToUserId());
msgInfo.setToUserName(sessionList.getListName());
msgInfo.setUnReadFlag(0);
// 消息持久化
msgInfoMapper.insert(msgInfo);
// 判断用户是否存在,不存在就结束
List<Object> list = CurPool.sessionPool.get(sessionList.getToUserId());
if (list == null || list.isEmpty()){
// 用户不存在,更新未读数
sessionListMapper.addUnReadCount(sessionList.getToUserId(),sessionList.getUserId());
}else{
// 用户存在,判断会话是否存在
String id = sessionListMapper.selectIdByUser(sessionList.getToUserId(), sessionList.getUserId())+"";
String o = list.get(0) + "";
if (id.equals(o)){
// 会话存在直接发送消息
sendTextMessage(sessionList.getToUserId(), JsonUtils.objectToJson(msgInfo));
}else {
// 判断会话列表是否存在
if (id == null || "".equals(id) || "null".equals(id)){
// 新增会话列表
SessionList tmpSessionList = new SessionList();
tmpSessionList.setUserId(sessionList.getToUserId());
tmpSessionList.setToUserId(sessionList.getUserId());
tmpSessionList.setListName(user.getName());
tmpSessionList.setUnReadCount(1);
sessionListMapper.insert(tmpSessionList);
}else {
// 更新未读消息数量
sessionListMapper.addUnReadCount(sessionList.getToUserId(),sessionList.getUserId());
}
// 会话不存在发送列表消息
List<SessionList> sessionLists = sessionListMapper.selectByUserId(sessionList.getToUserId());
sendTextMessage(sessionList.getToUserId() ,JsonUtils.objectToJson(sessionLists));
}
}
System.out.println("【websocket消息】收到客户端消息:"+message);
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
// 此为广播消息
// public void sendAllMessage(String message) {
// for(WebSocket webSocket : webSockets) {
// System.out.println("【websocket消息】广播消息:"+message);
// try {
// webSocket.session.getAsyncRemote().sendText(message);
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
// 此为单点消息 (发送文本)
public void sendTextMessage(Integer userId, String message) {
Session session = (Session)CurPool.sessionPool.get(userId).get(1);
if (session != null) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 此为单点消息 (发送对象)
// public void sendObjMessage(String sessionId, Object message) {
// Session session = CurPool.sessionPool.get(sessionId);
// if (session != null) {
// try {
session.getAsyncRemote().sendObject(message);
// session.getBasicRemote().sendText(JsonUtils.objectToJson(message));
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
// }
}
前端vue调用:
<template>
<div class="main">
<div class="main_up">
<div v-if="curUserName == ''">
<el-button @click="openDialog('登录')">登录</el-button>
<el-button @click="openDialog('注册')">注册</el-button>
</div>
<div v-else>
{{this.curUserName}}
<el-button type="info" @click="loginOut">退出登录</el-button>
</div>
</div>
<div class="main_down">
<div class="left">
<div class="left_up">
<div class="label_title">
已建立会话
</div>
<div :class="curSessionId == item.id ? 'box_select' : 'box'" v-for="item in sessionList_already" :key="item.id">
<div class="box_left" @click="startSession(item.id)">
{{item.listName}}
</div>
<div class="right_left">
<div class="right_left_count">
{{item.unReadCount}}
</div>
<div class="right_left_del">
<i class="el-icon-close" @click="delSession(item.id)"></i>
</div>
</div>
</div>
</div>
<div class="left_down">
<div class="label_title">
可建立会话
</div>
<div v-for="item in sessionList_not" :key="item.id" class="box" @click="createSession(item.id, item.name)">
<div class="box_left">
{{item.name}}
</div>
</div>
</div>
</div>
<div class="right">
<div class="up" ref="element" id="msg_end">
<div v-for="(item,i) in list" :key="i" :class="item.fromUserId === curUserId ? 'msg_right' : 'msg_left'">
<div class="msg_right_up">
{{item.fromUserName}}
</div>
<div :class="item.fromUserId === curUserId ? 'msg_right_down' : 'msg_left_down'">
{{item.content}}
</div>
</div>
</div>
<div class="down">
<el-input
type="textarea"
:rows="9"
placeholder="请输入内容,回车发送!"
@keyup.enter.native = "sendMsg"
v-model="textarea">
</el-input>
<!-- <el-button @click="sendMsg">发送</el-button> -->
</div>
</div>
</div>
<el-dialog
:title="dialogTitle"
:visible.sync="dialogVisible"
width="30%"
>
<el-input v-model="loginName" placeholder="请输入用户名..."></el-input>
<span slot="footer" class="dialog-footer">
<el-button @click="dialogVisible = false">取 消</el-button>
<el-button type="primary" @click="loginOrRegister">确 定</el-button>
</span>
</el-dialog>
</div>
</template>
<script>
import axios from 'axios';
export default {
name: 'HelloWorld',
data(){
return{
dialogVisible: false,
dialogTitle: '',
loginName: '',
textarea: "",
list: [],
curUserId: "",
curUserName: "",
curSessionId: '',
sessionList_already:[],
sessionList_not:[],
}
},
created() { // 页面创建生命周期函数
},
updated(){
// 解决每次发消息到最后面
var elmnt = document.getElementById("msg_end");
elmnt.scrollTop = elmnt.scrollHeight;
},
destroyed: function () { // 离开页面生命周期函数
this.websocketclose();
},
methods: {
initWebSocket: function (userId,sessionId) {
// WebSocket与普通的请求所用协议有所不同,ws等同于http,wss等同于https
this.websock = new WebSocket("ws://127.0.0.1:9090/websocket/"+userId+"/"+sessionId);
this.websock.onopen = this.websocketonopen;
this.websock.onerror = this.websocketonerror;
this.websock.onmessage = this.websocketonmessage;
this.websock.onclose = this.websocketclose;
},
websocketonopen: function () {
console.log("WebSocket连接成功");
},
websocketonerror: function (e) {
console.log("WebSocket连接发生错误",e);
},
websocketonmessage: function (e) {
let data = JSON.parse(e.data);
if(data instanceof Array){
// 列表数据
this.sessionList_already = data
}else{
// 消息数据
this.list.push(data)
}
},
websocketclose: function (e) {
if(this.curUserId != null){
if(this.curSessionId != null){
this.initWebSocket(this.curUserId, this.curSessionId)
}else{
this.initWebSocket(this.curUserId, 99999999)
}
}
console.log("connection closed",e);
console.log(e);
},
// 消息发送
sendMsg(){
if(this.curSessionId == ''){
return this.$message.error("请选择左边的对话框开始聊天!")
}
let data = {
"fromUserId": this.curUserId,
"fromUserName": this.curUserName,
"content": this.textarea
}
this.list.push(data)
this.websock.send(this.textarea)
this.textarea = ''
},
openDialog(openName){
this.dialogTitle = openName
this.dialogVisible = true
},
// 登录or注册
loginOrRegister(){
let thus = this
if(this.dialogTitle == "注册"){
axios.get('http://127.0.0.1:9090/register?name=' + this.loginName)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.$message.success("注册成功");
thus.dialogVisible = false
})
.catch(function (error) {
console.log(error);
});
}else if(this.dialogTitle == '登录'){
axios.get('http://127.0.0.1:9090/login?name=' + this.loginName)
.then(function (response) {
console.log(response.data);
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.curUserId = response.data.data.id
thus.curUserName = response.data.data.name
thus.dialogVisible = false
thus.getSessionListNot()
thus.sessionListAlready()
thus.startSession(99999999)
})
.catch(function (error) {
console.log(error);
});
}
},
// 获取可建立会话列表
getSessionListNot(){
let thus = this
axios.get('http://127.0.0.1:9090/sessionList/not?id=' + this.curUserId)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.sessionList_not = response.data.data
})
.catch(function (error) {
console.log(error);
});
},
// 获取已存在的会话列表
sessionListAlready(){
let thus = this
axios.get('http://127.0.0.1:9090/sessionList/already?id=' + this.curUserId)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.sessionList_already = response.data.data
})
.catch(function (error) {
console.log(error);
});
},
// 创建会话
createSession(toUserId, toUserName){
let thus = this
axios.get('http://127.0.0.1:9090/createSession?id=' + this.curUserId + "&toUserId=" + toUserId + "&toUserName=" + toUserName)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.getSessionListNot()
thus.sessionListAlready()
})
.catch(function (error) {
console.log(error);
});
},
// 开始会话
startSession(sessionId){
console.log(this.websock);
if(this.websock != undefined){
this.websock.close()
}
this.curSessionId = sessionId
this.initWebSocket(this.curUserId, sessionId)
this.msgList(sessionId)
},
// 删除会话
delSession(sessionId){
let thus = this
axios.get('http://127.0.0.1:9090/delSession?sessionId=' + sessionId)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.getSessionListNot()
thus.sessionListAlready()
})
.catch(function (error) {
console.log(error);
});
},
// 退出登录
loginOut(){
let thus = this
axios.get('http://127.0.0.1:9090/loginOut?name=' + this.curUserName)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.curUserId = ''
thus.curUserName = ''
return thus.$message.success("退出登录成功");
})
.catch(function (error) {
console.log(error);
});
},
// 获取消息数据
msgList(sessionId){
let thus = this
axios.get('http://127.0.0.1:9090/msgList?sessionId=' + sessionId)
.then(function (response) {
if(response.data.code == -1){
return thus.$message.error(response.data.errDesc);
}
thus.list = response.data.data
// 从新获取列表
thus.sessionListAlready()
})
.catch(function (error) {
console.log(error);
});
}
}
}
</script>
<!-- Add "scoped" attribute to limit CSS to this component only -->
<style scoped>
.main{
width: 980px;
/* border: 1px #1890ff solid; */
margin-top: 50px;
height: 790px;
}
.main_up{
width: 980px;
height: 40px;
/* border:1px red solid; */
}
.main_down{
width: 981px;
height: 750px;
border: 1px #1890ff solid;
display: flex;
justify-self: space-between;
}
.left{
width: 300px;
height: 750px;
border-right: 1px #1890ff solid;
}
.left_up{
width: 300px;
height: 450px;
overflow-y: auto;
/* border: 1px red solid; */
}
.label_title{
width: 282px;
height: 25px;
background-color: #f8f8f8;
font-weight: 600;
padding: 8px;
}
.left_down{
width: 300px;
height: 300px;
overflow-y: auto;
/* border: 1px green solid; */
}
.right{
width: 680px;
height: 750px;
/* border-right: 1px #1890ff solid; */
}
.box{
width: 250px;
height: 22px;
padding: 10px 25px 10px 25px;
display: flex;
justify-self: flex-end;
/* border: 1px red solid; */
}
.box:hover{
background-color: gainsboro;
cursor: pointer;
}
.box_select{
width: 250px;
height: 22px;
padding: 10px 25px 10px 25px;
display: flex;
justify-self: flex-end;
background-color: gainsboro;
}
.box_left{
width: 230px;
height: 22px;
}
.right_left{
width: 50px;
height: 22px;
display: flex;
justify-content: space-between;
/* border: 1px red solid; */
}
.right_left_count{
/* border: 1px blue solid; */
padding-left: 10px;
width: 20px;
}
.right_left_del{
width: 20px;
padding-left: 10px;
}
.up{
width: 680px;
height: 550px;
overflow-y: scroll;
overflow-x: hidden;
/* padding-bottom: 40px; */
border-bottom: 1px #1890ff solid;
}
.msg_left{
width: 675px;
/* padding-left: 5px; */
margin-top: 5px;
/* border: 1px #1890ff solid; */
}
.msg_left_up{
height: 25px;
margin-top: 5px;
}
.msg_left_down{
height: 25px;
/* border: 1px #1890ff solid; */
padding-left: 10px;
}
.msg_right{
width: 660px;
/* padding-right: 20px; */
margin-top: 5px;
/* border: 1px #1890ff solid; */
text-align: right;
}
.msg_right_up{
height: 25px;
}
.msg_right_down{
height: 25px;
/* border: 1px #1890ff solid; */
padding-right: 10px;
}
.down{
width: 680px;
height: 200px;
/* border: 1px red solid; */
}
</style>
实现发送语音
生成音频文件,并进行保存上传到服务器中。然后将文件名字保存到数据库中,聊天时前端判断返回的数据是否为语音,如果为语音则直接根据名字调用音频文件获得语音内容。
1.用户点击发送语音,保存语音到服务器上,并保存名字到数据库中
2.接收语音时获得音频文件即可
3.前端根据获得的音频文件进行展示即可。
文章来源:https://www.toymoban.com/news/detail-407445.html
文章来源地址https://www.toymoban.com/news/detail-407445.html
package com.jt.controller;
import com.jt.util.FtpUtil;
import javax.sound.sampled.*;
import java.io.*;
public class EngineeCore {
String filePath = timeNumber()+".mp3";//要存储文件的位置
AudioFormat audioFormat;
TargetDataLine targetDataLine;
boolean flag = true;
public static void main(String[] args) {
EngineeCore engineeCore = new EngineeCore(); //调用麦克风获取音频
engineeCore.startRecognize();
}
private void stopRecognize() {
flag = false;
targetDataLine.stop();
targetDataLine.close();
}private AudioFormat getAudioFormat() {
float sampleRate = 16000;
// 8000,11025,16000,22050,44100
int sampleSizeInBits = 16;
// 8,16
int channels = 1;
// 1,2
boolean signed = true;
// true,false
boolean bigEndian = false;
// true,false
return new AudioFormat(sampleRate, sampleSizeInBits, channels, signed, bigEndian);
}// end getAudioFormat
private void startRecognize() {
try {
// 获得指定的音频格式
audioFormat = getAudioFormat();
DataLine.Info dataLineInfo = new DataLine.Info(TargetDataLine.class, audioFormat);
targetDataLine = (TargetDataLine) AudioSystem.getLine(dataLineInfo);
// Create a thread to capture the microphone
// data into an audio file and start the
// thread running. It will run until the
// Stop button is clicked. This method
// will return after starting the thread.
flag = true;
new CaptureThread().start();
} catch (Exception e) {
e.printStackTrace();
} // end catch
}// end captureAudio method
class CaptureThread extends Thread {
public void run() {
AudioFileFormat.Type fileType = null;
File audioFile = new File(filePath);
fileType = AudioFileFormat.Type.WAVE;
//声音录入的权值
int weight = 2;
//判断是否停止的计数
int downSum = 0;
ByteArrayInputStream bais = null;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
AudioInputStream ais = null;
try {
targetDataLine.open(audioFormat);
targetDataLine.start();
byte[] fragment = new byte[1024];
ais = new AudioInputStream(targetDataLine);
while (flag) {
targetDataLine.read(fragment, 0, fragment.length);
//当数组末位大于weight时开始存储字节(有声音传入),一旦开始不再需要判断末位
if (Math.abs(fragment[fragment.length-1]) > weight || baos.size() > 0) {
baos.write(fragment);
System.out.println("守卫:"+fragment[0]+",末尾:"+fragment[fragment.length-1]+",lenght"+fragment.length);
//判断语音是否停止
if(Math.abs(fragment[fragment.length-1])<=weight){
downSum++;
}else{
System.out.println("重置奇数");
downSum=0;
}//计数超过20说明此段时间没有声音传入(值也可更改)
if(downSum>20){
System.out.println("停止录入");
break;
}
}
}
//取得录音输入流
audioFormat = getAudioFormat();
byte audioData[] = baos.toByteArray();
bais = new ByteArrayInputStream(audioData);
ais = new AudioInputStream(bais, audioFormat, audioData.length / audioFormat.getFrameSize());
//定义最终保存的文件名
System.out.println("开始生成语音文件");
AudioSystem.write(ais, AudioFileFormat.Type.WAVE, audioFile);
downSum = 0;
stopRecognize();
//上传到服务器中
BufferedInputStream buffer = new BufferedInputStream(new FileInputStream(filePath));
boolean b = FtpUtil.uploadFileFTP("111.111.111.111", 21, "ftpuser", "ftpuser", "/www/wwwroot/ftpuser", timeNumber()+".mp3", buffer);
System.out.println("b = " + b);
boolean delete = audioFile.delete();//删除本地文件
System.out.println("delete = " + delete);
buffer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
//关闭流
try {
ais.close();
bais.close();
baos.reset();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//获得当前时间戳
public Long timeNumber(){
long l = System.currentTimeMillis();
return l;
}
}
到了这里,关于java使用WebSocket实现一对一实时对话的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!