零丶长轮询的引入
最近在看工作使用到的diamond配置中心原理,发现大多数配置中心在推和拉模型上做的选择出奇的一致选择了基于长轮询的拉模型
- 优点:简单、可靠。
- 缺点:应用增多时,较高的轮询频率给整个配置中心服务带来巨大的压力。
-
基于推模型的客户端长轮询的方案
客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性。
一丶何为长轮询
长轮询
本质上是原始轮询技术的一种更有效的形式。
- 从tomcat服务器的角度就是客户端不停请求,每次都得解析报文封装成Request,Response对象,并且占用线程池中的一个线程。
- 并且每次轮询都要进行tcp握手,挥手,网卡发起中断,操作系统处理中断从内核空间拷贝数据到用户空间,一通忙活服务端返回
配置未修改(配置中心没有修改配置,客户端缓存的配置和配置中心一致,所以是白忙活
长轮询是一种服务器选择尽可能长的时间保持和客户端连接打开的技术
,仅在数据变得可用或达到超时阙值后才提供响应
,而不是在给到客户端的新数据可用之前,让每个客户端多次发起重复的请求
及时性,节省网络开销
的作用。
二丶使用等待唤醒机制写一个简单的“长轮询”(脱裤子放屁)
package com.cuzzz.springbootlearn.longpull;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@RestController
@RequestMapping("long-pull"
public class MyController implements InitializingBean {
/**
* 处理任务的线程
*/
private ThreadPoolExecutor processExecutor;
/**
* 等待唤醒的锁
*/
private static final ReentrantLock lock = new ReentrantLock(;
/**
* 当请求获取配置的时候,在此condition上等待一定时间
* 当修改配置的时候通过这个condition 通知其他获取配置的线程
*/
private static final Condition condition = lock.newCondition(;
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response throws ExecutionException, InterruptedException {
//组转成任务
Task<String> task = new Task<String>(request, response,
( -> "拿配置" + System.currentTimeMillis(;
//提交到线程池
Future<?> submit = processExecutor.submit(task;
//tomcat线程阻塞于此
submit.get(;
}
/**
* 模拟修改配置
*
* 唤醒其他获取配置的线程
*/
@PostMapping
public String post(HttpServletRequest request, HttpServletResponse response {
lock.lock(;
try {
condition.signalAll(;
}finally {
lock.unlock(;
}
return "OK";
}
static class Task<T> implements Runnable {
private HttpServletResponse response;
/**
* 等待时长
*/
private final long timeout;
private Callable<T> task;
public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task {
this.response = response;
String time = request.getHeader("time-out";
if (time == null{
//默认等待10秒
this.timeout = 10;
}else {
this.timeout = Long.parseLong(time;
}
this.task = task;
}
@Override
public void run( {
lock.lock(;
try {
//超市等待
boolean await = condition.await(timeout, TimeUnit.SECONDS;
//超时
if (!await {
throw new TimeoutException(;
}
//获取配置
T call = task.call(;
//写回
ServletOutputStream outputStream = response.getOutputStream(;
outputStream.write(("没超时拿当前配置:" + call.getBytes(StandardCharsets.UTF_8;
} catch (TimeoutException | InterruptedException exception {
//超时或者线程被中断
try {
ServletOutputStream outputStream = response.getOutputStream(;
T call = task.call(;
outputStream.write(("超时or中断拿配置:" + call.getBytes(StandardCharsets.UTF_8;
} catch (Exception ex {
throw new RuntimeException(ex;
}
} catch (Exception e {
throw new RuntimeException(e;
} finally {
lock.unlock(;
}
}
}
@Override
public void afterPropertiesSet( {
int cpuNums = Runtime.getRuntime(.availableProcessors(;
processExecutor
= new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100, new ThreadPoolExecutor.CallerRunsPolicy(;
}
}
使用get方法反问的请求回被提交到线程池进行await等待,使用post方法的请求回唤醒这些线程。
submit.get方法会发现其实什么结果都不会,这是因为异步提交到线程池后,tomcat已经结束了这次请求,并没有维护这个连接,所以没有办法写回结果。
三丶Tomcat Servlet 3.0长轮询原理
1.AsyncContext实现长轮询
package com.cuzzz.springbootlearn.longpull;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@RestController
@RequestMapping("long-pull3"
public class MyController2 {
private static final ScheduledExecutorService procesExecutor
= Executors.newSingleThreadScheduledExecutor(;
/**
* 记录配置改变的map
*/
private static final ConcurrentHashMap<String, String> configCache
= new ConcurrentHashMap<>(;
/**
* 记录长轮询的任务
*/
private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
= new ConcurrentLinkedDeque<>(;
static {
//每2秒看一下释放配置变更,或者任务超时
procesExecutor.scheduleWithFixedDelay(( -> {
List<AsyncTask>needRemove = new ArrayList<>(;
for (AsyncTask asyncTask : interestQueue {
if (asyncTask.timeout( {
asyncTask.run(;
needRemove.add(asyncTask;
continue;
}
if (configCache.containsKey(asyncTask.configId {
needRemove.add(asyncTask;
asyncTask.run(;
}
}
interestQueue.removeAll(needRemove;
}, 1, 2, TimeUnit.SECONDS;
}
static class AsyncTask implements Runnable {
private final AsyncContext asyncContext;
private final long timeout;
private static long startTime;
private String configId;
AsyncTask(AsyncContext asyncContext {
this.asyncContext = asyncContext;
HttpServletRequest request = (HttpServletRequest asyncContext.getRequest(;
String timeStr = request.getHeader("time-out";
if (timeStr == null {
timeout = 10;
} else {
timeout = Long.parseLong(timeStr;
}
//关注的配置key,应该getParameter的,无所谓
this.configId = request.getHeader("config-id";
if (this.configId == null {
this.configId = "default";
}
//开始时间
startTime = System.currentTimeMillis(;
}
//是否超时
public boolean timeout( {
return (System.currentTimeMillis( - startTime / 1000 > timeout;
}
@Override
public void run( {
String result = "开始于" + System.currentTimeMillis( + "--";
try {
if (timeout( {
result = "超时: " + result;
} else {
result += configCache.get(this.configId;
}
result += "--结束于:" + System.currentTimeMillis(;
ServletResponse response = asyncContext.getResponse(;
response.getOutputStream(.write(result.getBytes(StandardCharsets.UTF_8;
//后续将交给tomcat线程池处理,将给客户端响应
asyncContext.complete(;
} catch (IOException e {
throw new RuntimeException(e;
}
}
}
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response {
//打印处理的tomcate线程id
System.out.println("线程id" + Thread.currentThread(.getId(;
//添加一个获取配置的异步任务
interestQueue.add(new AsyncTask(asyncContext;
//开启异步
AsyncContext asyncContext = request.startAsync(;
asyncContext.setTimeout(0;
//监听器打印最后回调的tomcat线程id
asyncContext.addListener(new AsyncListener( {
@Override
public void onComplete(AsyncEvent event throws IOException {
System.out.println("线程id" + Thread.currentThread(.getId(;
}
//...剩余其他方法
};
//立马就会释放tomcat线程池资源
System.out.println("tomcat主线程释放";
}
@PostMapping
public void post(HttpServletRequest request {
String c = String.valueOf(request.getParameter("config-id";
if (c.equals("null"{
c = "default";
}
String v = String.valueOf(request.getParameter("value";
configCache.put(c, v;
}
}
AsyncContext tomcat是如何实现长轮询
获取配置更改的是我们的单线程定时2秒去轮询。
2.实现原理
2.1 tomcat处理一个请求的流程
-
Poller组件持有多路复用器selector,poller组件不停从自身的事件队列中将事件取出注册到自身的多路复用器上,同时多路复用器会不停的轮询检查是否有通道准备就绪,准备就绪的通道就可以扔给tomcat线程池处理了。
-
-
这里会根据协议创建不同的Processor处理,这里创建的是Http11Processor,Http11Processor会使用CoyoteAdapter去解析报文随后交给Container去处理请求
-
至此会调用到Servlete#service方法,SpringMVC中的Dispatcher会反射调用我们controller的方法
-
在Connector组件中创建了Http11NioProtocol组件,Http11NioProtocol默认持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且启动的时候会启动一个线程运行Acceptor
2.2 AsyncContext 如何实现异步
2.2.1 request.startAsync( 修改异步状态机状态为Starting
状态机的初始状态是AsyncState.DISPATCHED,通过setStarted将状态机的状态更新成STARTING
2.2.2 AbstractProtocol启动定时任务处理超时异步请求
tomcat线程在执行完我们的Servlet代码后,Http11NioProtocol会判断请求状态,如果为Long那么会塞到waitProcessors集合中。
Http11Processor#doTimeoutAsycn然后由封装的socket通道socketWrapper以TIMEOUT的事件类型重新提交到tomcat线程池中。
2.2.3 AsyncContext#complete触发OPEN_READ事件
四丶长轮询的优点和缺点
本文学习了长轮询和tomcat长轮询的原理,可以看到这种方式的优点
- 浏览器长轮询的过程中,请求并没有理解响应,而是等到超时或者有需要返回的数据(比如配置中心在这个超时事件内发送配置的变更)才返回,解决了短轮询频繁进行请求网络开销的问题,减少了读多写少业务情景下无意义请求。
- 真是通过这种方式,减少了无意义的请求,而且释放了tomcat线程池中的线程,使得我们服务端可以支持更多的客户端(因为业务逻辑是放在其他的线程池执行的,而且对于配置中心来说,可以让多个客户端的长轮询请求由一个线程去处理,原本是一个请求一个tomcat线程处理,从而可以支持更多的请求)
-
多台实例监听配置中心实例,出现不一致的情况
比如配置中心四台实例监听配置变更,前三台可能响应了得到V1的配置,但是轮询到第四台实例的请求的时候又发生了变更可能就得到了v2的配置,这时候这四台配置不一致了。需要保证这种一致性需要我们采取其他的策略,比如配置中心服务端主动udp推,或者加上版本号保证这四台配置一致。
hold住请求也是会消耗资源的,如果1w个请求同时到来,我们都需要hold住(封装成任务塞到队列)这写任务也是会占用内存的,而短轮询则会立马返回,从而时间资源的释放