Tomcat长轮询原理与源码解析

这篇具有很好参考价值的文章主要介绍了Tomcat长轮询原理与源码解析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章目录和关于我

零丶长轮询的引入

最近在看工作使用到的配置中心原理,发现大多数配置中心在推和拉模型上做的选择出奇的一致选择了基于长轮询的拉模型

  • 基于拉模型的客户端轮询的方案
    客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性。

    • 优点:简单、可靠。
    • 缺点:应用增多时,较高的轮询频率给整个配置中心服务带来巨大的压力。

    另外,从配置中心的应用场景上来看,是一种写少读多的系统,客户端大多数轮询请求都是没有意义的,因此这种方案不够高效。

  • 基于推模型的客户端长轮询的方案

    基于Http长轮询模型,实现了让客户端在没有发生动态配置变更的时候减少轮询。这样减少了无意义的轮询请求量,提高了轮询的效率;也降低了系统负载,提升了整个系统的资源利用率。

一丶何为长轮询

长轮询 本质上是原始轮询技术的一种更有效的形式。

它的出现是为了解决:向服务器发送重复请求会浪费资源,因为必须为每个新传入的请求建立连接,必须解析请求的 HTTP 头部,必须执行对新数据的查询,并且必须生成和交付响应(通常不提供新数据)然后必须关闭连接并清除所有资源。

  • 从tomcat服务器的角度就是客户端不停请求,每次都得解析报文封装成Request,Response对象,并且占用线程池中的一个线程。
  • 并且每次轮询都要进行tcp握手,挥手,网卡发起中断,操作系统处理中断从内核空间拷贝数据到用户空间,一通忙活服务端返回 配置未修改(配置中心没有修改配置,客户端缓存的配置和配置中心一致,所以是白忙活)

长轮询是一种服务器选择尽可能长的时间保持和客户端连接打开的技术仅在数据变得可用或达到超时阙值后才提供响应而不是在给到客户端的新数据可用之前,让每个客户端多次发起重复的请求

Tomcat长轮询原理与源码解析

简而言之,就是服务端并不是立马写回响应,而是hold住一段时间,如果这段时间有数据需要写回(例如配置的修改,新配置需要写回)再写回,然后浏览器再发送一个新请求,从而实现及时性,节省网络开销的作用。

Tomcat长轮询原理与源码解析

二丶使用等待唤醒机制写一个简单的“长轮询”(脱裤子放屁)

package com.cuzzz.springbootlearn.longpull;

import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

@RestController
@RequestMapping("long-pull")
public class MyController implements InitializingBean {

    /**
     * 处理任务的线程
     */
    private ThreadPoolExecutor processExecutor;
    /**
     * 等待唤醒的锁
     */
    private static final ReentrantLock lock = new ReentrantLock();
    /**
     * 当请求获取配置的时候,在此condition上等待一定时间
     * 当修改配置的时候通过这个condition 通知其他获取配置的线程
     */
    private static final Condition condition = lock.newCondition();

    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {
        //组转成任务
        Task<String> task = new Task<String>(request, response,
                () -> "拿配置" + System.currentTimeMillis());
        //提交到线程池
        Future<?> submit = processExecutor.submit(task);
        //tomcat线程阻塞于此
        submit.get();
    }

    /**
     * 模拟修改配置
     *
     * 唤醒其他获取配置的线程
     */
    @PostMapping
    public String post(HttpServletRequest request, HttpServletResponse response) {
        lock.lock();
        try {
            condition.signalAll();
        }finally {
            lock.unlock();
        }
        return "OK";
    }


    static class Task<T> implements Runnable {
        private HttpServletResponse response;
        /**
         * 等待时长
         */
        private final long timeout;
        private Callable<T> task;

        public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task) {
            this.response = response;

            String time = request.getHeader("time-out");
            if (time == null){
                //默认等待10秒
                this.timeout = 10;
            }else {
                this.timeout = Long.parseLong(time);
            }
            this.task = task;
        }


        @Override
        public void run() {
            lock.lock();
            try {

                //超市等待
                boolean await = condition.await(timeout, TimeUnit.SECONDS);
                //超时
                if (!await) {
                    throw new TimeoutException();
                }
                //获取配置
                T call = task.call();
                //写回
                ServletOutputStream outputStream = response.getOutputStream();
                outputStream.write(("没超时拿当前配置:" + call).getBytes(StandardCharsets.UTF_8));
            } catch (TimeoutException | InterruptedException exception) {
                //超时或者线程被中断
                try {
                    ServletOutputStream outputStream = response.getOutputStream();
                    T call = task.call();
                    outputStream.write(("超时or中断拿配置:" + call).getBytes(StandardCharsets.UTF_8));
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
    }


    @Override
    public void afterPropertiesSet() {

        int cpuNums = Runtime.getRuntime().availableProcessors();

        processExecutor
                = new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
    }
}

使用get方法反问的请求回被提交到线程池进行await等待,使用post方法的请求回唤醒这些线程。

但是这个写法有点脱裤子放屁

Tomcat长轮询原理与源码解析

为什么会出现这种情况,直接提交到线程池异步执行不可以么,加入我们删除上面submit.get方法会发现其实什么结果都不会,这是因为异步提交到线程池后,tomcat已经结束了这次请求,并没有维护这个连接,所以没有办法写回结果。

如果不删除这一行,tomcat线程阻塞住我们可以写回结果,但是其实没有达到配置使用长轮询的初衷——"解放tomcat线程,让配置中心服务端可以处理更多请求"。

Tomcat长轮询原理与源码解析

所以我们现在陷入一个尴尬的境地,怎么解决昵?看下去

三丶Tomcat Servlet 3.0长轮询原理

1.AsyncContext实现长轮询

package com.cuzzz.springbootlearn.longpull;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@RestController
@RequestMapping("long-pull3")
public class MyController2 {

    private static final ScheduledExecutorService procesExecutor
            = Executors.newSingleThreadScheduledExecutor();
    /**
     * 记录配置改变的map
     */
    private static final ConcurrentHashMap<String, String> configCache
            = new ConcurrentHashMap<>();
    /**
     * 记录长轮询的任务
     */
    private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
            = new ConcurrentLinkedDeque<>();

    static {
        //每2秒看一下释放配置变更,或者任务超时
        procesExecutor.scheduleWithFixedDelay(() -> {
            List<AsyncTask>needRemove  = new ArrayList<>();
            for (AsyncTask asyncTask : interestQueue) {
                if (asyncTask.timeout()) {
                    asyncTask.run();
                    needRemove.add(asyncTask);
                    continue;
                }
                if (configCache.containsKey(asyncTask.configId)) {
                    needRemove.add(asyncTask);
                    asyncTask.run();
                }
            }
            interestQueue.removeAll(needRemove);
        }, 1, 2, TimeUnit.SECONDS);
    }


    static class AsyncTask implements Runnable {
        private final AsyncContext asyncContext;
        private final long timeout;
        private static long startTime;
        private String configId;

        AsyncTask(AsyncContext asyncContext) {
            this.asyncContext = asyncContext;
            HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
            String timeStr = request.getHeader("time-out");
            if (timeStr == null) {
                timeout = 10;
            } else {
                timeout = Long.parseLong(timeStr);
            }
        	//关注的配置key,应该getParameter的,无所谓
            this.configId = request.getHeader("config-id");
            if (this.configId == null) {
                this.configId = "default";
            }
            
            //开始时间
            startTime = System.currentTimeMillis();
        }
		
        //是否超时
        public boolean timeout() {
            return (System.currentTimeMillis() - startTime) / 1000 > timeout;
        }

        @Override
        public void run() {
		
            String result = "开始于" + System.currentTimeMillis() + "--";
            try {
                if (timeout()) {
                    result = "超时: " + result;
                } else {
                    result += configCache.get(this.configId);
                }

                result += "--结束于:" + System.currentTimeMillis();
                ServletResponse response = asyncContext.getResponse();
                response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
                
                //后续将交给tomcat线程池处理,将给客户端响应
                asyncContext.complete();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

        }

    }


    @GetMapping
    public void get(HttpServletRequest request, HttpServletResponse response) {
        //打印处理的tomcate线程id
        System.out.println("线程id" + Thread.currentThread().getId());
        //添加一个获取配置的异步任务
        interestQueue.add(new AsyncTask(asyncContext));
        //开启异步
        AsyncContext asyncContext = request.startAsync();
        asyncContext.setTimeout(0);
        //监听器打印最后回调的tomcat线程id
        asyncContext.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("线程id" + Thread.currentThread().getId());
            }
            //...剩余其他方法
        });
        
        //立马就会释放tomcat线程池资源
        System.out.println("tomcat主线程释放");
    }

    @PostMapping
    public void post(HttpServletRequest request) {
        String c = String.valueOf(request.getParameter("config-id"));
        if (c.equals("null")){
            c = "default";
        }
        String v = String.valueOf(request.getParameter("value"));
        configCache.put(c, v);
    }
}

Tomcat长轮询原理与源码解析

Tomcat长轮询原理与源码解析

上面演示利用AsyncContext tomcat是如何实现长轮询

这种方式的优势在于:解放了tomcat线程,其实tomcat的线程只是运行了get方法中的代码,然后立马可以去其他请求,真正获取配置更改的是我们的单线程定时2秒去轮询。

Tomcat长轮询原理与源码解析

2.实现原理

2.1 tomcat处理一个请求的流程

Tomcat长轮询原理与源码解析

  • Connector是客户端连接到Tomcat容器的服务点,它提供协议服务来将引擎与客户端各种协议隔离开来

    在Connector组件中创建了Http11NioProtocol组件,Http11NioProtocol默认持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且启动的时候会启动一个线程运行Acceptor

  • Acceptor服务器端监听客户端的连接,会启动线程一直执行

    Tomcat长轮询原理与源码解析

    每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。,每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。

  • Poller组件持有多路复用器selector,poller组件不停从自身的事件队列中将事件取出注册到自身的多路复用器上,同时多路复用器会不停的轮询检查是否有通道准备就绪,准备就绪的通道就可以扔给tomcat线程池处理了。

    Tomcat长轮询原理与源码解析

    Tomcat长轮询原理与源码解析

  • tomcat线程池处理请求

    • 这里会根据协议创建不同的Processor处理,这里创建的是Http11Processor,Http11Processor会使用CoyoteAdapter去解析报文随后交给Container去处理请求

    • CoyoteAdapter解析报文随后交给Container去处理请求

      Tomcat长轮询原理与源码解析

    • Container会将Filter和Servlet组装成FilterChain依次调用

      Tomcat长轮询原理与源码解析

    • FilterChain会依次调用Filter#doFilter,然后调用Servlet#service方法

      至此会调用到Servlete#service方法,SpringMVC中的DispatcherServlet会反射调用我们controller的方法

2.2 AsyncContext 如何实现异步

2.2.1 request.startAsync() 修改异步状态机状态为Starting

AsycContext内部持有一个AsyncStateMachine来管理异步请求的状态(有点状态模式的意思)

状态机的初始状态是AsyncState.DISPATCHED,通过setStarted将状态机的状态更新成STARTING

Tomcat长轮询原理与源码解析

2.2.2 AbstractProtocol启动定时任务处理超时异步请求

Connector启动的时候触发ProtocolHandler的start方法,如下

Tomcat长轮询原理与源码解析

其中startAsyncTimeout方法会遍历waitingProcessors中每一个Processor的timeoutAsync方法,这里的Processor就是Http11Processor

Tomcat长轮询原理与源码解析

那么waitProcessors中的Http11Processor是谁塞进去的昵?

tomcat线程在执行完我们的Servlet代码后,Http11NioProtocol会判断请求状态,如果为Long那么会塞到waitProcessors集合中。

如果发现请求超时,那么会调用Http11Processor#doTimeoutAsycn然后由封装的socket通道socketWrapper以TIMEOUT的事件类型重新提交到tomcat线程池中。

Tomcat长轮询原理与源码解析

2.2.3 AsyncContext#complete触发OPEN_READ事件

Tomcat长轮询原理与源码解析

可以看到其实和超时一样,只不过超时是由定时任务线程轮询来判断,而AsyncContext#complete则是我们业务线程触发processSocketEvent将后续处理提交到tomcat线程池中。

四丶长轮询的优点和缺点

本文学习了长轮询和tomcat长轮询的原理,可以看到这种方式的优点

  • 浏览器长轮询的过程中,请求并没有立即响应,而是等到超时或者有需要返回的数据(比如配置中心在这个超时事件内发送配置的变更)才返回,解决了短轮询频繁进行请求网络开销的问题,减少了读多写少业务情景下无意义请求。
  • 真是通过这种方式,减少了无意义的请求,而且释放了tomcat线程池中的线程,使得我们服务端可以支持更多的客户端(因为业务逻辑是放在其他的线程池执行的,而且对于配置中心来说,可以让多个客户端的长轮询请求由一个线程去处理,原本是一个请求一个tomcat线程处理,从而可以支持更多的请求)

当然这种方式也是有缺点的

  • hold住请求也是会消耗资源的,如果1w个请求同时到来,我们都需要hold住(封装成任务塞到队列)这写任务也是会占用内存的,而短轮询则会立马返回,从而时间资源的释放

  • 请求先后顺序无法保证,比如轮询第五个客户端的请求的时候,出现了配置的变更,这时候第五个请求会被提交到tomcat线程池中,从而早于前面四个请求得到响应,这对于需要严格有序的业务场景是有影响的

  • 多台实例监听配置中心实例,出现不一致的情况

    比如配置中心四台实例监听配置变更,前三台可能响应了得到V1的配置,但是轮询到第四台实例的请求的时候又发生了变更可能就得到了v2的配置,这时候这四台配置不一致了。需要保证这种一致性需要我们采取其他的策略,比如配置中心服务端主动udp推,或者加上版本号保证这四台配置一致。文章来源地址https://www.toymoban.com/news/detail-415529.html

到了这里,关于Tomcat长轮询原理与源码解析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 即时通讯:短轮询、长轮询、SSE 和 WebSocket 间的区别

    在现代 Web 开发中,即时通讯已经成为许多应用程序的重要组成部分。为了实现即时通讯,开发人员通常使用不同的技术和协议。本文将介绍四种常见的即时通讯实现方法:短轮询、长轮询、SSE(服务器发送事件)和 WebSocket,并探讨它们之间的区别。 短轮询是最简单的即时通

    2024年02月12日
    浏览(29)
  • IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket

    🎉IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket ☆* o(≧▽≦)o *☆嗨~我是IT·陈寒🍹 ✨博客主页:IT·陈寒的博客 🎈该系列文章专栏:Java面试技巧 📜其他专栏:Java学习路线 Java面试技巧 Java实战项目 AIGC人工智能 数据结构学习 🍹文章作者技术和水平有限,如果文中

    2024年02月05日
    浏览(31)
  • AsyncContext优雅实现HTTP长轮询接口

    接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢? 在这之前先简单介绍下长连接和短连接 HTTP长链接(Keep-Alive) 概念:

    2024年02月09日
    浏览(28)
  • RocketMQ的长轮询(Long Polling)实现分析

    目录 前言 长轮询 1.实现步骤 1.1客户端轮询发送请求 1.2服务端处理数据 1.3客户端接收数据 2.实现实例 RocketMQ长轮询 1.PullMessage服务 2.PullMessageProcessor服务 3.PullCallback回调 总结 消息队列一般在消费端都会提供push和pull两种模式,RocketMQ同样实现了这两种模式,分别提供了两个实

    2024年02月07日
    浏览(28)
  • WebSocket和HTTP协议有什么区别?&& 连环问:WebSocket和HTTP长轮询的区别?

    什么是WebSocket? 答:WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。 特点: 1. TCP连接,与HTTP协议兼容 2. 双向通信,主动推送(服务端向客户端) 3. 无同源限制,协议标识符是ws(加密wss) WebSocket: 1. 支持端对端通讯 2. 可以由client发起

    2024年02月11日
    浏览(31)
  • vscode 系列文章目录 - ctrl+鼠标左键无效

    vscode 中有时会遇到 “Alt + 鼠标点击” 有效,但 “Ctrl + 鼠标点击” 无效,这时可以通过 Ctrl + , 进行系统配置。 进入VScode的首选项,选择设置(快捷键 Ctrl + , ),输入Go to definition,找到如下两个设置。 Editor: Multi Cursor Modifier 设置成 alt “editor.gotoLocation.multipleDefinitions” 设置

    2024年04月23日
    浏览(41)
  • 【vim 学习系列文章 5 - cscope 过滤掉某些目录】

    上篇文章:【vim 学习系列文章 4 - vim与系统剪切板之间的交互】 下篇文章:【vim 学习系列文章 6 – vim 如何从上次退出的位置打开文件】 第一步 创建自己的 cscope 脚本 ~/.local/bin/cscope.sh ,如下: 我的这个脚本首先去区分当前执行 cscope 命令的目录是 rt-thread 目录还是 linux 目

    2024年02月12日
    浏览(73)
  • Git系列文章目录 - Git 子模块git submodule使用

    项目中有时会遇到会涉及子模块的使用,比如 flatpak 项目包含多个子模块。 进入需要添加子模块的目录,一般是项目根目录。 删除子模块目录及源码: 删除项目目录下.gitmodules文件中子模块相关条目: 删除配置项中子模块相关条目: 删除模块下的子模块目录: 清除子模块

    2024年01月20日
    浏览(42)
  • AIGC系列文章目录 第一章 AIGC 与AI对话,如何写好prompt?

    生成式人工智能AIGC(Artificial Intelligence Generated Content)是人工智能1.0时代进入2.0时代的重要标志。 AIGC对于人类社会、人工智能的意义是里程碑式的。 短期来看 AIGC改变了基础的生产力工具, 中期来看 会改变社会的生产关系, 长期来看 促使整个社会生产力发生质的突破,在

    2024年02月06日
    浏览(40)
  • openCV实战-系列教程7:轮廓检测2与模板匹配(轮廓检测/轮廓特征/轮廓近似/轮廓边界矩阵/轮廓边界圆/模版匹配)、原理解析、源码解读

    打印一个图片可以做出一个函数: 前面我们计算了这个图片的轮廓:  它的轮廓信息保存在了contours中,取出第一个轮廓,计算相关参数: 打印结果: 8500.5  437.9482651948929 这是分别求出了周长和面积,这里的True表示的是否是闭合的。    如图,第一个图是原图,如果将它的

    2024年02月10日
    浏览(29)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包