一、背景
接到一个需求,实现方案时需要提供一个HTTP接口,接口需要hold住5-8秒,轮询查询数据库,一旦数据库中值有变化,取出变化的值进行处理,处理完成后返回响应。这不就是长轮询吗,如何优雅的实现呢?
在这之前先简单介绍下长连接和短连接
HTTP长链接(Keep-Alive)
-
概念: HTTP长链接是指客户端与服务器在一次TCP连接上可以传输多个HTTP请求和响应。在请求完成后,连接不会立即关闭,而是保持开放状态,等待可能的后续请求。
-
优势:
- 减少延迟: 长链接避免了每次请求都需要重新建立TCP连接的开销,降低了通信延迟。
- 减少资源占用: 不需要频繁地打开和关闭连接,减少了服务器资源的占用。
-
应用场景:
- 实时性要求高的应用: 长链接适用于需要实时性响应的应用,例如即时通讯、实时更新等。
- 资源加载优化: 在Web开发中,适用于多个资源(如图片、样式表、脚本)的同时加载。
HTTP短连接
-
概念: HTTP短连接是指每个HTTP请求都需要建立一个新的TCP连接,请求完成后立即关闭连接。
-
优势:
- 简单: 短连接模式相对简单,易于理解和实现。
- 更好的控制: 对于某些资源密集型的应用,短连接可以更好地控制资源的释放。
-
应用场景:
- 低并发场景: 当并发请求数较低时,短连接可能更适用,因为它避免了长链接的开销。
- 资源密集型应用: 对于服务器资源消耗较大的应用,短连接可能更容易控制资源的释放。
何为轮询
所谓轮询,即是在一个循环周期内不断发起请求来得到数据的机制。只要有请求的的地方,都可以实现轮询,譬如各种事件驱动模型。它的长短是在于某次请求的返回周期。
1. 短轮询
短轮询指的是在循环周期内,不断发起请求,每一次请求都立即返回结果,根据新1日数据对比决定是否使用这个结果。
2. 长轮询
而长轮询及是在请求的过程中,若是服务器端数据并没有更新,那么则将这个连接挂起,直到服务器推送新的数据,再返回,然后再进入循环周期。
长短轮询和长短连接的区别
1. 第一个区别是决定的方式,
一个TCP连接是否为长连接,是通过设置HTTP的Connection Header来决定的,而且是需要两边都设置才有效。而一种轮询方式是否为长轮询,是根据服务端的处理方式来决定的,与客户端没有关系。
2. 第二个区别就是实现的方式
连接的长短是通过协议来规定和实现的。而轮询的长短,是服务器通过编程的方式手动挂起请求来实现的。
二、方案设计
在 Spring 中,AsyncContext 是用于支持异步处理的一个重要的特性。它允许我们在 servlet 请求处理过程中,将长时间运行的操作放在一个单独的线程中执行,而不会阻塞其他请求的处理。
AsyncContext 在以下两种情况下特别有用:
-
长时间运行的操作:当我们需要执行一些耗时的操作,例如网络请求、数据库查询或其他 I/O 操作时,通过将这些操作放在一个新的线程中,可以避免阻塞 servlet 容器中的线程,提高应用的并发性能。
-
推送异步响应:有时候,我们可能需要推送异步产生的响应,而不是等到所有操作都完成后再下发响应。通过 AsyncContext,我们可以在任何时间点上触发异步响应,将结果返回给客户端。
使用 AsyncContext 的步骤如下:
- 在 servlet 中启用异步模式:在 servlet 中,通过调用
startAsync()
方法,可以获取到当前请求的 AsyncContext 对象,从而启用异步处理模式。
HttpServletRequest request = ...;
AsyncContext asyncContext = request.startAsync();
- 指定异步任务:通过调用 AsyncContext 对象的
start()
方法,在新的线程中执行需要异步处理的任务。
asyncContext.start(() -> {
// 异步任务逻辑
});
- 提交响应:在异步任务完成后,可以调用 AsyncContext 对象的
complete()
方法,以表示异步操作完成。
asyncContext.complete();
需要注意的是,我们在使用 AsyncContext 时需要特别注意线程安全。由于异步任务在单独的线程中执行,所以可能存在并发问题。因此,在编写异步任务逻辑时,需要注意线程安全性,使用合适的同步措施。
另外,AsyncContext 也支持超时设置、错误处理、事件监听等功能,这些可以通过相应的方法和回调进行配置。可以根据具体的需求使用这些功能来优化异步处理的逻辑。文章来源:https://www.toymoban.com/news/detail-702762.html
总结来说,Spring 的 AsyncContext 提供了方便的异步处理机制,可以提高应用的并发性能,并支持推送异步响应,使得应用更具有响应性和可伸缩性。文章来源地址https://www.toymoban.com/news/detail-702762.html
三、方案1
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.*;
@RestController
@RequestMapping("/api/test")
@Slf4j
public class AsyncTestController {
@Resource
private RedisTemplate<String, String> redisTemplate;
private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
// private static boolean result = false;
@PostMapping("/async")
public void async(HttpServletRequest request, HttpServletResponse response) {
// 创建AsyncContext
AsyncContext asyncContext = request.startAsync(request, response);
// 设置处理超时时间8s
asyncContext.setTimeout(8000L);
// asyncContext监听
AsyncTestListener asyncListener = new AsyncTestListener(redisTemplate,asyncContext);
asyncContext.addListener(asyncListener);
// 定时处理业务,处理成功后asyncContext.complete();完成异步请求
asyncContext.start(asyncListener);
}
// 模拟业务处理完成
@PostMapping("/set")
public ResultModel notify(String key, String value) {
redisTemplate.opsForValue().set(key, value);
return ResultModel.success();
}
@PostMapping("/get")
public ResultModel get(String key) {
String s = redisTemplate.opsForValue().get(key);
return ResultModel.success(s);
}
@PostMapping("/del")
public ResultModel del(String key) {
redisTemplate.delete(key);
return ResultModel.success();
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import java.io.IOException;
@Slf4j
public class AsyncTestListener implements AsyncListener,Runnable {
boolean isComplete;
private RedisTemplate<String, String> redisTemplate;
private AsyncContext asyncContext;
public JdAsyncTestListener(RedisTemplate<String, String> redisTemplate, AsyncContext asyncContext) {
this.redisTemplate = redisTemplate;
this.asyncContext = asyncContext;
}
@Override
public void run() {
try {
while(true){
if(isComplete){
log.info("已经退出");
break;
}
boolean b = redisTemplate.opsForValue().get(1) != null;
log.info("获取标志位:"+b);
Thread.sleep(300);
if (b) {
asyncContext.getResponse().getWriter().print(1);
asyncContext.complete();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("结束了");
isComplete = true;
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("超时了");
isComplete = true;
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
}
}
四、方案2
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Validated
@RestController
@RequestMapping("/api/test")
@Slf4j
public class TestController {
@Resource
private RedisTemplate<String, String> redisTemplate;
private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
private static boolean result = false;
private final boolean isTimeout = false;
/**
* 消息
*
* @return
*/
@PostMapping("/test")
public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
// 创建AsyncContext
AsyncContext asyncContext = request.startAsync(request, response);
String test = LongPollRequest.getctomerId();
// 设置处理超时时间8s
asyncContext.setTimeout(8000L);
// asyncContext监听
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("onComplete={}", asyncEvent);
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("onTimeout={}", asyncEvent);
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("code", "500"); asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
asyncContext.complete();
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
log.info("onError={}", asyncEvent);
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
log.info("onStartAsync={}", asyncEvent);
}
});
// 定时处理业务,处理成功后asyncContext.complete();完成异步请求
timeoutChecker.scheduleAtFixedRate(() -> {
try {
String redisKey = getRedisKey(customerId);
String redisValue = redisTemplate.opsForValue().get(redisKey);
result = StringUtils.isNotBlank(redisValue);
if (result) {
//todo 长轮询查询数据库。通过customerId查询
send(test, redisValue);
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("code", "200");
map.put("msg", redisValue);
asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
asyncContext.complete();
}
} catch (IOException e) {
e.printStackTrace();
}
}, 0, 100L, TimeUnit.MILLISECONDS);
}
/**
* 发送消息
*/
private void send(String test, String content) {
}
}
到了这里,关于AsyncContext优雅实现HTTP长轮询接口的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!