【Java】javax.websocket

这篇具有很好参考价值的文章主要介绍了【Java】javax.websocket。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

javax.websocket

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndPointExporter() {
        return new ServerEndpointExporter();
    }
}
public enum WebSocketType {
    ON_OPEN,
    ON_MESSAGE,
    ON_ERROR,
    ON_CLOSE;
}
@Getter
@ToString
public class WebSocketEvent extends ApplicationEvent {

    private final WebSocketType state;
    private final String bizId;
    private final Object data;
    private final LocalDateTime dateTime;
    private final String sessionId;

    public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId, Object data, LocalDateTime dateTime) {
        super(source);
        this.state = state;
        this.sessionId = sessionId;
        this.bizId = bizId;
        this.data = data;
        this.dateTime = dateTime;
    }

    public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId, Object data) {
        this(source, state, sessionId, bizId, data, LocalDateTime.now());
    }

    public WebSocketEvent(Object source, WebSocketType state, String sessionId, String bizId) {
        this(source, state, sessionId, bizId, null, LocalDateTime.now());
    }

    public WebSocketEvent(Object source, WebSocketType state) {
        this(source, state, null, null);
    }

}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriComponentsBuilder;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;

@Slf4j
@Component
@ServerEndpoint(value = "/websocket")
public class WebSocketServer {

    private static final long MAX_IDLE_TIMEOUT = TimeUnit.SECONDS.toMillis(45);
    private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);
    private static final ConcurrentHashMap<String, SessionCache> SESSION_MAP = new ConcurrentHashMap<>();

    private static ApplicationContext context;

    @Autowired
    public void setApplicationContext(ApplicationContext context) {
        WebSocketServer.context = context;
    }

    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        final String id = session.getId();
        final List<String> allow = getQueryParams(session).get("allow");
        if (CollectionUtils.isNotEmpty(allow)) {
            final String bizId = allow.get(0);
            if (StringUtils.isNotBlank(bizId)) {
                final SessionCache put = SESSION_MAP.put(id, new SessionCache(session, bizId));
                int cnt = isNull(put) ? ONLINE_COUNT.incrementAndGet() : ONLINE_COUNT.get();
                log.info("连接 [{} ({})] 已加入,当前连接数 : {}", id, bizId, cnt);
                session.setMaxIdleTimeout(MAX_IDLE_TIMEOUT);
                context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_OPEN, id, bizId));
                return;
            }
        }
        closeQuietly(session, CloseReason.CloseCodes.CANNOT_ACCEPT);
        log.warn("连接 [{}] 被拒绝,没有设置业务标识", id);
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        final String id = session.getId();
        SessionCache remove = SESSION_MAP.remove(id);
        final boolean nonNull = nonNull(remove);
        final String bizId = nonNull ? remove.bizId : null;
        final int cnt = nonNull ? ONLINE_COUNT.decrementAndGet() : ONLINE_COUNT.get();
        log.info("连接 [{} ({})] 已断开,当前连接数 : {}", id, bizId, cnt);
        context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_CLOSE, id, bizId));
    }

    /**
     * 出现错误
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        final String id = session.getId();
        final SessionCache sessionCache = SESSION_MAP.get(id);
        final String bizId = nonNull(sessionCache) ? sessionCache.bizId : null;
        log.warn("连接 [{} ({})] 有错误 : {}\n{}", id, bizId, error.getMessage(), error.getStackTrace());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        final String id = session.getId();
        final SessionCache sessionCache = SESSION_MAP.get(id);
        final String bizId = nonNull(sessionCache) ? sessionCache.bizId : null;
        log.info("连接 [{} ({})] 有消息 : {}", id, bizId, message);
        if (StringUtils.isNotBlank(message)) {
            context.publishEvent(new WebSocketEvent(this, WebSocketType.ON_MESSAGE, id, bizId, message));
        }
    }


    public static boolean isOnline(SessionCache cache, long timestamp) {
        return nonNull(cache) && cache.session.isOpen() && !cache.isTimeout(timestamp);
    }

    public static boolean isOnline(String sessionId, long timestamp) {
        if (StringUtils.isNotBlank(sessionId)) {
            return isOnline(SESSION_MAP.get(sessionId), timestamp);
        }
        return false;
    }

    public static void removeTimeout() {
        final long now = System.currentTimeMillis();
        SESSION_MAP.forEach((key, value) -> {
            if (value.isTimeout(now) || !value.session.isOpen()) {
                closeQuietly(value.session, CloseReason.CloseCodes.GOING_AWAY);
                SESSION_MAP.remove(key);
                log.warn("主动断开 Timeout 连接 [{} ({})]", key, value.bizId);
            }
        });
    }

    public static void refreshTimestamp(String sessionId) {
        if (StringUtils.isNotBlank(sessionId)) {
            final SessionCache cache = SESSION_MAP.get(sessionId);
            if (nonNull(cache)) {
                cache.refreshTimestamp();
            }
        }
    }

    public static void removeUnAuthorization(String sessionId) {
        final SessionCache cache = SESSION_MAP.get(sessionId);
        if (nonNull(cache)) {
            closeQuietly(cache.session, CloseReason.CloseCodes.CANNOT_ACCEPT);
            SESSION_MAP.remove(sessionId);
            log.warn("主动断开 UnAuthorization 连接 [{} ({})]", sessionId, cache.bizId);
        }
    }

    /**
     * 发送消息,实践表明,每次浏览器刷新,session会发生变化。
     *
     * @param sessionId
     * @param message
     */
    public static void send(String sessionId, String message) {
        send(sessionId, message, null);
    }

    public static void send(String sessionId, String message, Consumer<SendResult> callback) {
        final long now = System.currentTimeMillis();
        if (StringUtils.isNotBlank(sessionId)) {
            final SessionCache cache = SESSION_MAP.get(sessionId);
            if (isOnline(cache, now)) {
                cache.session.getAsyncRemote().sendText(message, sendResult -> {
                    if (!sendResult.isOK()) {
                        Throwable ex = sendResult.getException();
                        final String bizId = cache.bizId;
                        log.error("向 连接 [{} ({})] 发送数据 出错 : {}\n{}", sessionId, bizId, ex.getMessage(), ex.getStackTrace());
                    }
                    Optional.ofNullable(callback).ifPresent(x -> x.accept(sendResult));
                });
            }
        }
    }

    public static void broadcast(String message) {
        SESSION_MAP.forEach((key, value) -> send(key, message));
    }

    public static void broadcast(String message, Consumer<SendResult> callback) {
        SESSION_MAP.forEach((key, value) -> send(key, message, callback));
    }

    public static void broadcast(String bizId, String message) {
        broadcast(bizId, message, null);
    }

    public static void broadcast(String bizId, String message, Consumer<SendResult> callback) {
        if (StringUtils.isNotBlank(bizId)) {
            final long now = System.currentTimeMillis();
            SESSION_MAP.forEach((key, value) -> {
                if (bizId.equals(value.bizId) && isOnline(value, now)) {
                    value.session.getAsyncRemote().sendText(message, sendResult -> {
                        if (!sendResult.isOK()) {
                            Throwable ex = sendResult.getException();
                            final String sessionId = value.session.getId();
                            log.error("向 连接 [{} ({})] 发送数据 出错 : {}\n{}", sessionId, bizId, ex.getMessage(), ex.getStackTrace());
                        }
                        Optional.ofNullable(callback).ifPresent(x -> x.accept(sendResult));
                    });
                }
            });
        }
    }

    private static Map<String, List<String>> getQueryParams(Session session) {
//        return session.getRequestParameterMap();
        return UriComponentsBuilder.fromUri(session.getRequestURI()).build().getQueryParams();
    }

    private static void closeQuietly(Session session) {
        closeQuietly(session, CloseReason.CloseCodes.NO_STATUS_CODE);
    }

    private static void closeQuietly(Session session, CloseReason.CloseCodes closeCode) {
        try {
            if (session.isOpen()) {
                session.close(new CloseReason(closeCode, ""));
            }
        } catch (Exception ignored) {
        }
    }

    private static final class SessionCache {
        private Session session;
        private String bizId;
        private long timestamp;

        public SessionCache(Session session) {
            this(session, "");
        }

        public SessionCache(Session session, String bizId) {
            this.session = session;
            this.bizId = bizId;
            this.timestamp = System.currentTimeMillis();
        }

        public Session getSession() {
            return session;
        }

        public void setSession(Session session) {
            this.session = session;
        }

        public String getBizId() {
            return bizId;
        }

        public void setBizId(String bizId) {
            this.bizId = bizId;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }

        public void refreshTimestamp() {
            setTimestamp(System.currentTimeMillis());
        }

        public boolean isTimeout(long now) {
            return now - getTimestamp() > MAX_IDLE_TIMEOUT;
        }

    }

}

文章来源地址https://www.toymoban.com/news/detail-675192.html

到了这里,关于【Java】javax.websocket的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 解决 IDEA出现 java: 程序包javax.servlet不存在 问题

            1.点击File → Project Strcture 2.转到如下页面:第一步:点击Libraries,第二步:点击+   3.出现New Project Library,点击Java   4.弹出如下页面:在Tomcat解压文件夹下选择lib文件找到servlet-api.jar,点击ok   5.应用该包,如图所示,点击Apply即可  完成以上步骤Tomcat就可以运行啦

    2024年02月04日
    浏览(57)
  • Java调用Azure证书错误javax.net.ssl.SSLHandshakeException

    一、背景 Azure作为微软的公有云平台,提供了非常丰富的SDK和API让开发人员可以非常方便的调用的各项服务。公司业务需要,我们需要访问Azure上注册的应用程序,需要访问https地址 https://login.microsoftonline.com/​your-​​tenant-id 。 二、错误信息 简短报错信息:javax.net.ssl.SSLHa

    2024年02月06日
    浏览(52)
  • Java: javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated

    我们在平时练习的时候一般使用低版本的jdk来练习,以便了解不同版本jdk的区别,下面是我们练习中遇到的问题    此代码在本地开发环境正常运行,但是在服务器上mHttpClient.execute会报错:  Exception in thread \\\"main\\\" javax.NET.ssl.SSLPeerUnverifiedException: peer not authenticated 本地开发环境

    2024年02月22日
    浏览(43)
  • 解决异常 java.lang.NoClassDefFoundError: javax/xml/bind/JAXBException

    一、问题现象 用Tomcat启动Web工程的时候,catalina.out 日志文件中报错如下: 二、问题原因 服务器上的Java 的JDK版本是 11.0.4 ,版本过高。 JAXB API是java EE 的API,因此在Java SE 9.0 中不再包含这个 Jar 包。 Java 9 中引入了模块的概念,默认情况下,Java SE中将不再包含java EE 的Jar包。

    2024年02月07日
    浏览(42)
  • javax.websocket.server.ServerContainer not available

    当springboot项目集成了websocket时,此时单元测试类启动后就会报: 这是因为springbootTest启动时不会启动服务器,所以websocket就会报错,这个时候需要在注解中添加webEnvironment,给wevsocket提供测试环境:

    2024年02月11日
    浏览(63)
  • 解决java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter三更博客系统

    这个错误通常发生在使用JDK 9或更高版本的Java应用程序中,因为在这些版本中,JDK已经移除了JAXB默认实现的某些部分。 要解决这个问题,您可以尝试以下几个步骤: 升级您的应用程序以使用JAXB的最新实现。可以从Maven仓库或JAXB的官方网站中获取JAXB的最新版本。 1. 如果您使

    2024年02月15日
    浏览(80)
  • springboot整合websocket后启动报错:javax.websocket.server.ServerContainer not available

    Springboot使用@ServerEndpoint来建立websocket链接。引入依赖。 配置Websocket springboot项目添加websocket依赖后运行测试类报如下错误: 报的错误是创建ServerEndpointExporterBean失败,原因是ServerContainer不可用,那么我们就去看到ServerContainer在ServerEndpointExporter中是怎么注入的。 点进去 Serve

    2024年01月18日
    浏览(42)
  • Java调用ssl异常,javax.net.ssl.SSLHandshakeException: No appropriate protocol

      现象:sqlserver 2017 安装在docker里,系统是mac 13,java 1.8.371运行java程序提示上面ssl错误,根据百度提供的方法,修改文件,重启程序搞定。 解决办法:java.security 找到这个文件修改保存 发现是jdk1.8版本导致SSL调用权限上有问题,新电脑装的jdk是jdk1.8.0_291,版本比较高。搜到

    2024年02月13日
    浏览(44)
  • Exception in thread “main“ java.lang.NoClassDefFoundError: javax/servlet/Servlet

    缺少jar包 jar包冲突 先查看pom文件中是否引入了对应的依赖,如果没有的话,需要引入: 如果是从 Maven Repository 上面获取的依赖记得要 删除scop 这一行,我就是网上找了几百种方法,才发现是这个问题!

    2024年02月13日
    浏览(49)
  • java.lang.NoSuchMethodError: javax.servlet.http.HttpServletResponse.setContentLengthLong(J)V

    先说原因,其实都是你的jar有问题 1.jar冲突 2.少依赖了包 网上很多说springmvc和springwebmvc,版本高于5.3.0导致的,会有这个问题,那是因为他们没看过源码,高版本的因为在这里,多了一行代码,这里设置的时候,会到实现类里面写一个东西,this.servletResponse.setContentLengthLong(

    2024年02月11日
    浏览(47)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包