基于Sentinel自研组件的系统限流/降级/负载保护最佳实践探索

科技资讯 投稿 7300 0 评论

基于Sentinel自研组件的系统限流/降级/负载保护最佳实践探索

作者:京东物流 杨建民

一、Sentinel简介

流量控制、熔断降级系统负载保护等多个维度保护服务的稳定性。

    丰富的应用场景:秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。
  • 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。
  • 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Apache Dubbo、gRPC、Quarkus 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。同时 Sentinel 提供 Java/Go/C++ 等多语言的原生实现。
  • 完善的 SPI 扩展机制:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等

本次主要使用了Sentinel的降级、限流、系统负载保护功能

二、Sentinel关键技术源码解析

•StatisticSlot 则用于记录、统计不同维度的 runtime 指标监控信息

•当后续的 slot 通过,没有抛出 BlockException 异常,说明该资源被成功调用,则增加执行线程数和通过的请求数等信息。

项目结构

2.1注解入口

2.1.1 Entry、Context、Node

Entry中持有本次对资源调用的相关信息:

•curNode:Entry当前是在哪个节点。

•resourceWrapper:Entry关联的资源信息。

Context的源码注释如下,

This class holds metadata of current invocation


Node的源码注释

Holds real-time statistics for resources


Node中保存了对资源的实时数据的统计,Sentinel中的限流或者降级等功能就是通过Node中的数据进行判断的。Node是一个接口,里面定义了各种操作request、exception、rt、qps、thread的方法。

非常优秀的数据结构,实际应用场景里需要计数时可以考虑使用。

2.2 初始化

2.2.1 Context初始化

之前简单提到过Node,是用来统计数据用的,不同Node功能如下:

•StatisticNode:统计节点,是Node接口的实现类,用于完成数据统计

•DefaultNode:默认节点,用于统计一个资源在当前Context中的流量数据

protected static Context trueEnter(String name, String origin {
        Context context = contextHolder.get(;
        if (context == null {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name;
            if (node == null {
                if (localCacheNameMap.size( > Constants.MAX_CONTEXT_NAME_SIZE {
                    setNullContext(;
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock(;
                    try {
                        node = contextNameNodeMap.get(name;
                        if (node == null {
                            if (contextNameNodeMap.size( > Constants.MAX_CONTEXT_NAME_SIZE {
                                setNullContext(;
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN, null;
                                // Add entrance node.
                                Constants.ROOT.addChild(node;

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size( + 1;
                                newMap.putAll(contextNameNodeMap;
                                newMap.put(name, node;
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock(;
                    }
                }
            }
            context = new Context(node, name;
            context.setOrigin(origin;
            contextHolder.set(context;
        }

        return context;
    }


2.2.2 通过SpiLoader默认初始化8个slot

•NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级

存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据

记录、统计不同纬度的 runtime 指标监控信息

流量控制;

黑白名单控制;

熔断降级;

控制总的入口流量;

2.3 StatisticSlot

2.3.1 Node

/**
 * Holds real-time statistics for resources.
 *
 * @author qinan.qn
 * @author leyou
 * @author Eric Zhao
 */
public interface Node extends OccupySupport, DebugSupport {
    long totalRequest(;
    long totalPass(;
    long totalSuccess(;
    long blockRequest(;
    long totalException(;
    double passQps(;
    double blockQps(;
    double totalQps(;
    double successQps(;
    ……
}


2.3.2 StatisticNode

我们先从最基础的StatisticNode开始看,源码给出的定位是:

The statistic node keep three kinds of real-time statistics metrics:
metrics in second level ({@code rollingCounterInSecond}
metrics in minute level ({@code rollingCounterInMinute}
thread count


StatisticNode只有四个属性,除了之前提到过的LongAddr类型的curThreadNum外,还有两个属性是Metric对象,通过入参已经属性命名可以看出,一个用于秒级,一个用于分钟级统计。接下来我们就要看看Metric

// StatisticNode持有两个Metric,一个秒级一个分钟级,由入参可知,秒级统计划分了两个时间窗口,窗口程度是500ms
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL;

// 分钟级统计划分了60个时间窗口,窗口长度是1000ms
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false;

/**
 * The counter for thread count.
 */
private LongAdder curThreadNum = new LongAdder(;

/**
 * The last timestamp when metrics were fetched.
 */
private long lastFetchTime = -1;


ArrayMetric只有一个属性LeapArray,其余都是用于统计的方法,LeapArray是sentinel中统计最基本的数据结构,这里有必要详细看一下,总体就是根据timeMillis去获取一个bucket,分为:没有创建、有直接返回、被废弃后的reset三种场景。

//以分钟级的统计属性为例,看一下时间窗口初始化过程
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false;


public LeapArray(int sampleCount, int intervalInMs {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount;
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive";
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided";
        // windowLengthInMs = 60*1000 / 60 = 1000 滑动窗口时间长度,可见sentinel默认将单位时间分为了60个滑动窗口进行数据统计
        this.windowLengthInMs = intervalInMs / sampleCount;
        // 60*1000
        this.intervalInMs = intervalInMs;
        // 60
        this.intervalInSecond = intervalInMs / 1000.0;
        // 60
        this.sampleCount = sampleCount;
        // 数组长度60
        this.array = new AtomicReferenceArray<>(sampleCount;
    }

/**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis {
        if (timeMillis < 0 {
            return null;
        }
        // 根据当前时间戳算一个数组索引
        int idx = calculateTimeIdx(timeMillis;
        // Calculate current bucket start time.
        // timeMillis % 1000
        long windowStart = calculateWindowStart(timeMillis;

        /*
         * Get bucket item at given time from the array.
         *
         * (1 Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2 Bucket is up-to-date, then just return the bucket.
         * (3 Bucket is deprecated, then reset current bucket.
         */
        while (true {
            WindowWrap<T> old = array.get(idx;
            if (old == null {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                // newEmptyBucket 方法重写,秒级和分钟级统计对象实现不同
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis;
                if (array.compareAndSet(idx, null, window {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield(;
                }
            } else if (windowStart == old.windowStart( {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart( {
                /*
                 *   (old
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock( {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart;
                    } finally {
                        updateLock.unlock(;
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield(;
                }
            } else if (windowStart < old.windowStart( {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis;
            }
        }
    }
// 持有一个时间窗口对象的数据,会根据当前时间戳除以时间窗口长度然后散列到数组中
private int calculateTimeIdx(/*@Valid*/ long timeMillis {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int(timeId % array.length(;
    }


WindowWrap持有了windowLengthInMs, windowStart和LeapArray(分钟统计实现是BucketLeapArray,秒级统计实现是OccupiableBucketLeapArray),对于分钟级别的统计,MetricBucket维护了一个longAddr数组和一个配置的minRT

/**
 * The fundamental data structure for metric statistics in a time span.
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 * @see LeapArray
 */
public class BucketLeapArray extends LeapArray<MetricBucket> {

    public BucketLeapArray(int sampleCount, int intervalInMs {
        super(sampleCount, intervalInMs;
    }

    @Override
    public MetricBucket newEmptyBucket(long time {
        return new MetricBucket(;
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime {
        // Update the start time and reset value.
        w.resetTo(startTime;
        w.value(.reset(;
        return w;
    }
}


2.4 FlowSlot

2.4.1 常见限流算法

计数器算法

顾名思义,计数器算法就是统计某个时间段内的请求,每单位时间加1,然后与配置的限流值(最大QPS)进行比较,如果超出则触发限流。但是这种算法不能做到“平滑限流”,以1s为单位时间,100QPS为限流值为例,如下图,会出现某时段超出限流值的情况

漏斗算法

漏斗算法同样的也有一定的缺点:无法应对突发流量。比如一瞬间来了100个请求,在漏桶算法中只能一个一个的过去,当最后一个请求流出的时候时间已经过了一秒了,所以漏斗算法比较适合请求到达比较均匀,需要严格控制请求速率的场景。

令牌桶算法

2.4.2 单机限流模式

接下来我们看一下Sentinel中的限流实现,相比上述基本限流算法,Sentinel限流的第一个特性就是引入“资源”的概念,可以细粒度多样性的支持特定资源、关联资源、指定链路的限流。

    /**
     * 流量控制两种模式 
     *   0: thread count(当调用该api的线程数达到阈值的时候,进行限流)
     *   1: QPS(当调用该api的QPS达到阈值的时候,进行限流)
     */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     * 流量控制阈值,值含义与grade有关
     */
    private double count;

    /**
     * 调用关系限流策略(可以支持关联资源或指定链路的多样性限流需求)
     *  直接(api 达到限流条件时,直接限流)
     *  关联(当关联的资源达到限流阈值时,就限流自己)
     *  链路(只记录指定链路上的流量)
     * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin;
     * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource;
     * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource.
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * Reference resource in flow control with relevant resource or context.
     */
    private String refResource;

    /**
     * 流控效果:
     * 0. default(reject directly,直接拒绝,抛异常FlowException
     * 1. warm up, 慢启动模式(根据coldFactor(冷加载因子,默认3)的值,从阈值/coldFactor,经过预热时长,才达到设置的QPS阈值)
     * 2. rate limiter  排队等待
     * 3. warm up + rate limiter
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;

    private int warmUpPeriodSec = 10;

    /**
     * Max queueing time in rate limiter behavior.
     */
    private int maxQueueingTimeMs = 500;

    /**
    *  是否集群限流,默认为否
    */
    private boolean clusterMode;
    /**
     * Flow rule config for cluster mode.
     */
    private ClusterFlowConfig clusterConfig;

    /**
     * The traffic shaping (throttling controller.
     */
    private TrafficShapingController controller;


接着我们继续分析FlowRuleChecker

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized {
        // 根据策略选择Node来进行统计(可以是本身Node、关联的Node、指定的链路)
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node;
        if (selectedNode == null {
            return true;
        }

        return rule.getRater(.canPass(selectedNode, acquireCount, prioritized;
    }


static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node {
        // limitApp是访问控制使用的,默认是default,不限制来源
        String limitApp = rule.getLimitApp(;
        // 拿到限流策略
        int strategy = rule.getStrategy(;
        String origin = context.getOrigin(;
        // 基于调用来源做鉴权
        if (limitApp.equals(origin && filterOrigin(origin {
            if (strategy == RuleConstant.STRATEGY_DIRECT {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode(;
            }
            // 
            return selectReferenceNode(rule, context, node;
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp {
            if (strategy == RuleConstant.STRATEGY_DIRECT {
                // Return the cluster node.
                return node.getClusterNode(;
            }

            return selectReferenceNode(rule, context, node;
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource( {
            if (strategy == RuleConstant.STRATEGY_DIRECT {
                return context.getOriginNode(;
            }

            return selectReferenceNode(rule, context, node;
        }

        return null;
    }

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node {
        String refResource = rule.getRefResource(;
        int strategy = rule.getStrategy(;

        if (StringUtil.isEmpty(refResource {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE {
            return ClusterBuilderSlot.getClusterNode(refResource;
        }

        if (strategy == RuleConstant.STRATEGY_CHAIN {
            if (!refResource.equals(context.getName( {
                return null;
            }
            return node;
        }
        // No node.
        return null;
    }

// 此代码是load限流规则时根据规则初始化流量整形控制器的逻辑,rule.getRater(返回TrafficShapingController
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule {
        if (rule.getGrade( == RuleConstant.FLOW_GRADE_QPS {
            switch (rule.getControlBehavior( {
                // 预热模式返回WarmUpController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(, rule.getWarmUpPeriodSec(,
                            ColdFactorProperty.coldFactor;
                // 排队模式返回ThrottlingController
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new ThrottlingController(rule.getMaxQueueingTimeMs(, rule.getCount(;
                // 预热+排队模式返回WarmUpRateLimiterController
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(, rule.getWarmUpPeriodSec(,
                            rule.getMaxQueueingTimeMs(, ColdFactorProperty.coldFactor;
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject.
            }
        }
        // 默认是DefaultController
        return new DefaultController(rule.getCount(, rule.getGrade(;
    }


Sentinel单机限流算法

•DefaultController:是一个非常典型的滑动窗口计数器算法实现,将当前统计的qps和请求进来的qps进行求和,小于限流值则通过,大于则计算一个等待时间,稍后再试

•WarmUpController:实现参考了Guava的带预热的RateLimiter,区别是Guava侧重于请求间隔,类似前面提到的令牌桶,而Sentinel更关注于请求数,和令牌桶算法有点类似

DefaultController

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized {
        int curCount = avgUsedTokens(node;
        if (curCount + acquireCount > count {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis(;
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count;
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout( {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount;
                    node.addOccupiedPass(acquireCount;
                    sleep(waitInMs;

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs;
                }
            }
            return false;
        }
        return true;
    }


ThrottlingController

 public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat {
        this(queueingTimeoutMs, maxCountPerStat, 1000;
    }

    public ThrottlingController(int queueingTimeoutMs, double maxCountPerStat, int statDurationMs {
        AssertUtil.assertTrue(statDurationMs > 0, "statDurationMs should be positive";
        AssertUtil.assertTrue(maxCountPerStat >= 0, "maxCountPerStat should be >= 0";
        AssertUtil.assertTrue(queueingTimeoutMs >= 0, "queueingTimeoutMs should be >= 0";
        this.maxQueueingTimeMs = queueingTimeoutMs;
        this.count = maxCountPerStat;
        this.statDurationMs = statDurationMs;
        // Use nanoSeconds when durationMs%count != 0 or count/durationMs> 1 (to be accurate
        // 可见配置限流值count大于1000时useNanoSeconds会是true否则是false
        if (maxCountPerStat > 0 {
            this.useNanoSeconds = statDurationMs % Math.round(maxCountPerStat != 0 || maxCountPerStat / statDurationMs > 1;
        } else {
            this.useNanoSeconds = false;
        }
    }

    @Override
    public boolean canPass(Node node, int acquireCount {
        return canPass(node, acquireCount, false;
    }

    private boolean checkPassUsingNanoSeconds(int acquireCount, double maxCountPerStat {
        final long maxQueueingTimeNs = maxQueueingTimeMs * MS_TO_NS_OFFSET;
        long currentTime = System.nanoTime(;
        // Calculate the interval between every two requests.
        final long costTimeNs = Math.round(1.0d * MS_TO_NS_OFFSET * statDurationMs * acquireCount / maxCountPerStat;

        // Expected pass time of this request.
        long expectedTime = costTimeNs + latestPassedTime.get(;

        if (expectedTime <= currentTime {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime;
            return true;
        } else {
            final long curNanos = System.nanoTime(;
            // Calculate the time to wait.
            long waitTime = costTimeNs + latestPassedTime.get( - curNanos;
            if (waitTime > maxQueueingTimeNs {
                return false;
            }

            long oldTime = latestPassedTime.addAndGet(costTimeNs;
            waitTime = oldTime - curNanos;
            if (waitTime > maxQueueingTimeNs {
                latestPassedTime.addAndGet(-costTimeNs;
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0 {
                sleepNanos(waitTime;
            }
            return true;
        }
    }
    
    // 漏斗算法具体实现
    private boolean checkPassUsingCachedMs(int acquireCount, double maxCountPerStat {
        long currentTime = TimeUtil.currentTimeMillis(;
        // 计算两次请求的间隔(分为秒级和纳秒级)
        long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat;

        // 请求的期望的时间
        long expectedTime = costTime + latestPassedTime.get(;

        if (expectedTime <= currentTime {
            // latestPassedTime是AtomicLong类型,支持volatile语义
            latestPassedTime.set(currentTime;
            return true;
        } else {
            // 计算等待时间
            long waitTime = costTime + latestPassedTime.get( - TimeUtil.currentTimeMillis(;
            // 如果大于最大排队时间,则触发限流
            if (waitTime > maxQueueingTimeMs {
                return false;
            }
            
            long oldTime = latestPassedTime.addAndGet(costTime;
            waitTime = oldTime - TimeUtil.currentTimeMillis(;
            if (waitTime > maxQueueingTimeMs {
                latestPassedTime.addAndGet(-costTime;
                return false;
            }
            // in race condition waitTime may <= 0
            if (waitTime > 0 {
                sleepMs(waitTime;
            }
            return true;
        }
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0 {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0 {
            return false;
        }
        if (useNanoSeconds {
            return checkPassUsingNanoSeconds(acquireCount, this.count;
        } else {
            return checkPassUsingCachedMs(acquireCount, this.count;
        }
    }

    private void sleepMs(long ms {
        try {
            Thread.sleep(ms;
        } catch (InterruptedException e {
        }
    }

    private void sleepNanos(long ns {
        LockSupport.parkNanos(ns;
    }


long costTime = Math.round(1.0d * statDurationMs * acquireCount / maxCountPerStat;


由上述计算两次请求间隔的公式我们可以发现,当maxCountPerStat(规则配置的限流值QPS)超过1000后,就无法准确计算出匀速排队模式下的请求间隔时长,因此对应前面介绍的,当规则配置限流值超过1000QPS后,会采用checkPassUsingNanoSeconds,小于1000QPS会采用checkPassUsingCachedMs,对比一下checkPassUsingNanoSeconds和checkPassUsingCachedMs,可以发现主体思路没变,只是统计维度从毫秒换算成了纳秒,因此只看checkPassUsingCachedMs实现就可以

WarmUpController

 
@Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized {
        long passQps = (long node.passQps(;

        long previousQps = (long node.previousPassQps(;
        syncToken(previousQps;

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的qps
        long restToken = storedTokens.get(;
        if (restToken >= warningToken {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count;
            if (passQps + acquireCount <= warningQps {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count {
                return true;
            }
        }

        return false;
    }

protected void syncToken(long passQps {
        long currentTime = TimeUtil.currentTimeMillis(;
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get(;
        if (currentTime <= oldLastFillTime {
            return;
        }

        long oldValue = storedTokens.get(;
        long newValue = coolDownTokens(currentTime, passQps;

        if (storedTokens.compareAndSet(oldValue, newValue {
            long currentValue = storedTokens.addAndGet(0 - passQps;
            if (currentValue < 0 {
                storedTokens.set(0L;
            }
            lastFilledTime.set(currentTime;
        }

    }

private long coolDownTokens(long currentTime, long passQps {
        long oldValue = storedTokens.get(;
        long newValue = oldValue;

        // 添加令牌的判断前提条件:
        // 当令牌的消耗程度远远低于警戒线的时候
        if (oldValue < warningToken {
            newValue = (long(oldValue + (currentTime - lastFilledTime.get( * count / 1000;
        } else if (oldValue > warningToken {
            if (passQps < (intcount / coldFactor {
                newValue = (long(oldValue + (currentTime - lastFilledTime.get( * count / 1000;
            }
        }
        return Math.min(newValue, maxToken;
    }


2.4.3 集群限流

passClusterCheck方法(因为clusterService找不到会降级到非集群限流

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized {
        try {
            // 获取当前节点是Token Client还是Token Server
            TokenService clusterService = pickClusterService(;
            if (clusterService == null {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized;
            }
            long flowId = rule.getClusterConfig(.getFlowId(;
            // 根据获取的flowId通过TokenService进行申请token。从上面可知,它可能是TokenClient调用的,也可能是ToeknServer调用的。分别对应的类是DefaultClusterTokenClient和DefaultTokenService
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized;
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized;
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex;
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized;
    }

//获取当前节点是Token Client还是Token Server。
//1) 如果当前节点的角色是Client,返回的TokenService为DefaultClusterTokenClient;
//2)如果当前节点的角色是Server,则默认返回的TokenService为DefaultTokenService。
private static TokenService pickClusterService( {
        if (ClusterStateManager.isClient( {
            return TokenClientProvider.getClient(;
        }
        if (ClusterStateManager.isServer( {
            return EmbeddedClusterTokenServerProvider.getServer(;
        }
        return null;
    }


集群限流模式

Sentinel 集群限流服务端有两种启动方式:

•独立模式(Alone)适合全局限流,需要独立部署

集群限流模式降级

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized {
        try {
            TokenService clusterService = pickClusterService(;
            if (clusterService == null {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized;
            }
            long flowId = rule.getClusterConfig(.getFlowId(;
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized;
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized;
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex;
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        // 可以看到如果集群限流有异常,会降级到单机限流模式,如果配置不允许降级,那么直接会跳过此次校验
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized;
    }


2.5 DegradeSlot

大神对断路器的解释:https://martinfowler.com/bliki/CircuitBreaker.html

public interface CircuitBreaker {

    /**
     * Get the associated circuit breaking rule.
     *
     * @return associated circuit breaking rule
     */
    DegradeRule getRule(;

    /**
     * Acquires permission of an invocation only if it is available at the time of invoking.
     *
     * @param context context of current invocation
     * @return {@code true} if permission was acquired and {@code false} otherwise
     */
    boolean tryPass(Context context;

    /**
     * Get current state of the circuit breaker.
     *
     * @return current state of the circuit breaker
     */
    State currentState(;

    /**
     * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
     * <p>Called when a <strong>passed</strong> invocation finished.</p>
     *
     * @param context context of current invocation
     */
    void onRequestComplete(Context context;

    /**
     * Circuit breaker state.
     */
    enum State {
        /**
         * In {@code OPEN} state, all requests will be rejected until the next recovery time point.
         */
        OPEN,
        /**
         * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
         * If the invocation is abnormal according to the strategy (e.g. it's slow, the circuit breaker
         * will re-transform to the {@code OPEN} state and wait for the next recovery time point;
         * otherwise the resource will be regarded as "recovered" and the circuit breaker
         * will cease cutting off requests and transform to {@code CLOSED} state.
         */
        HALF_OPEN,
        /**
         * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
         * the circuit breaker will transform to {@code OPEN} state.
         */
        CLOSED
    }
}


以ExceptionCircuitBreaker为例看一下具体实现

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    
    // 异常模式有两种,异常率和异常数
    private final int strategy;
    // 最小请求数
    private final int minRequestAmount;
    // 阈值
    private final double threshold;
    
    // LeapArray是sentinel统计数据非常重要的一个结构,主要封装了时间窗口相关的操作
    private final LeapArray<SimpleErrorCounter> stat;

    public ExceptionCircuitBreaker(DegradeRule rule {
        this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs(;
    }

    ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat {
        super(rule;
        this.strategy = rule.getGrade(;
        boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
        AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count";
        AssertUtil.notNull(stat, "stat cannot be null";
        this.minRequestAmount = rule.getMinRequestAmount(;
        this.threshold = rule.getCount(;
        this.stat = stat;
    }

    @Override
    protected void resetStat( {
        // Reset current bucket (bucket count = 1.
        stat.currentWindow(.value(.reset(;
    }

    
    @Override
    public void onRequestComplete(Context context {
        Entry entry = context.getCurEntry(;
        if (entry == null {
            return;
        }
        Throwable error = entry.getError(;
        SimpleErrorCounter counter = stat.currentWindow(.value(;
        if (error != null {
            counter.getErrorCount(.add(1;
        }
        counter.getTotalCount(.add(1;

        handleStateChangeWhenThresholdExceeded(error;
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error {
        if (currentState.get( == State.OPEN {
            return;
        }
        
        if (currentState.get( == State.HALF_OPEN {
            // In detecting request
            if (error == null {
                fromHalfOpenToClose(;
            } else {
                fromHalfOpenToOpen(1.0d;
            }
            return;
        }
        
        List<SimpleErrorCounter> counters = stat.values(;
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters {
            
 += counter.errorCount.sum(;
            totalCount += counter.totalCount.sum(;
        }
        if (totalCount < minRequestAmount {
            return;
        }
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO {
            // Use errorRatio
            curCount = errCount * 1.0d / totalCount;
        }
        if (curCount > threshold {
            transformToOpen(curCount;
        }
    }

    static class SimpleErrorCounter {
        private LongAdder errorCount;
        private LongAdder totalCount;

        public SimpleErrorCounter( {
            this.errorCount = new LongAdder(;
            this.totalCount = new LongAdder(;
        }

        public LongAdder getErrorCount( {
            return errorCount;
        }

        public LongAdder getTotalCount( {
            return totalCount;
        }

        public SimpleErrorCounter reset( {
            errorCount.reset(;
            totalCount.reset(;
            return this;
        }

        @Override
        public String toString( {
            return "SimpleErrorCounter{" +
                "errorCount=" + errorCount +
                ", totalCount=" + totalCount +
                '}';
        }
    }

    static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {

        public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs {
            super(sampleCount, intervalInMs;
        }

        @Override
        public SimpleErrorCounter newEmptyBucket(long timeMillis {
            return new SimpleErrorCounter(;
        }

        @Override
        protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime {
            // Update the start time and reset value.
            w.resetTo(startTime;
            w.value(.reset(;
            return w;
        }
    }
}


2.6 SystemSlot

校验逻辑主要集中在com.alibaba.csp.sentinel.slots.system.SystemRuleManager#checkSystem,以下是片段,可以看到,作为负载保护规则校验,实现了集群的QPS、线程、RT(响应时间)、系统负载的控制,除系统负载以外,其余统计都是依赖StatisticSlot实现,系统负载是通过SystemRuleManager定时调度SystemStatusListener,通过OperatingSystemMXBean去获取

/**
     * Apply {@link SystemRule} to the resource. Only inbound traffic will be checked.
     *
     * @param resourceWrapper the resource.
     * @throws BlockException when any system rule's threshold is exceeded.
     */
    public static void checkSystem(ResourceWrapper resourceWrapper, int count throws BlockException {
        if (resourceWrapper == null {
            return;
        }
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get( {
            return;
        }

        // for inbound traffic only
        if (resourceWrapper.getEntryType( != EntryType.IN {
            return;
        }

        // total qps 此处是拿到某个资源在集群中的QPS总和,相关概念可以会看初始化关于Node的介绍
        double currentQps = Constants.ENTRY_NODE.passQps(;
        if (currentQps + count > qps {
            throw new SystemBlockException(resourceWrapper.getName(, "qps";
        }

        // total thread 
        int currentThread = Constants.ENTRY_NODE.curThreadNum(;
        if (currentThread > maxThread {
            throw new SystemBlockException(resourceWrapper.getName(, "thread";
        }

        double rt = Constants.ENTRY_NODE.avgRt(;
        if (rt > maxRt {
            throw new SystemBlockException(resourceWrapper.getName(, "rt";
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad( > highestSystemLoad {
            if (!checkBbr(currentThread {
                throw new SystemBlockException(resourceWrapper.getName(, "load";
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage( > highestCpuUsage {
            throw new SystemBlockException(resourceWrapper.getName(, "cpu";
        }
    }

    private static boolean checkBbr(int currentThread {
        if (currentThread > 1 &&
            currentThread > Constants.ENTRY_NODE.maxSuccessQps( * Constants.ENTRY_NODE.minRt( / 1000 {
            return false;
        }
        return true;
    }

    public static double getCurrentSystemAvgLoad( {
        return statusListener.getSystemAverageLoad(;
    }

    public static double getCurrentCpuUsage( {
        return statusListener.getCpuUsage(;
    }


public class SystemStatusListener implements Runnable {

    volatile double currentLoad = -1;
    volatile double currentCpuUsage = -1;

    volatile String reason = StringUtil.EMPTY;

    volatile long processCpuTime = 0;
    volatile long processUpTime = 0;

    public double getSystemAverageLoad( {
        return currentLoad;
    }

    public double getCpuUsage( {
        return currentCpuUsage;
    }

    @Override
    public void run( {
        try {
            OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class;
            currentLoad = osBean.getSystemLoadAverage(;

            /*
             * Java Doc copied from {@link OperatingSystemMXBean#getSystemCpuLoad(}:</br>
             * Returns the "recent cpu usage" for the whole system. This value is a double in the [0.0,1.0] interval.
             * A value of 0.0 means that all CPUs were idle during the recent period of time observed, while a value
             * of 1.0 means that all CPUs were actively running 100% of the time during the recent period being
             * observed. All values between 0.0 and 1.0 are possible depending of the activities going on in the
             * system. If the system recent cpu usage is not available, the method returns a negative value.
             */
            double systemCpuUsage = osBean.getSystemCpuLoad(;

            // calculate process cpu usage to support application running in container environment
            RuntimeMXBean runtimeBean = ManagementFactory.getPlatformMXBean(RuntimeMXBean.class;
            long newProcessCpuTime = osBean.getProcessCpuTime(;
            long newProcessUpTime = runtimeBean.getUptime(;
            int cpuCores = osBean.getAvailableProcessors(;
            long processCpuTimeDiffInMs = TimeUnit.NANOSECONDS
                    .toMillis(newProcessCpuTime - processCpuTime;
            long processUpTimeDiffInMs = newProcessUpTime - processUpTime;
            double processCpuUsage = (double processCpuTimeDiffInMs / processUpTimeDiffInMs / cpuCores;
            processCpuTime = newProcessCpuTime;
            processUpTime = newProcessUpTime;

            currentCpuUsage = Math.max(processCpuUsage, systemCpuUsage;

            if (currentLoad > SystemRuleManager.getSystemLoadThreshold( {
                writeSystemStatusLog(;
            }
        } catch (Throwable e {
            RecordLog.warn("[SystemStatusListener] Failed to get system metrics from JMX", e;
        }
    }

    private void writeSystemStatusLog( {
        StringBuilder sb = new StringBuilder(;
        sb.append("Load exceeds the threshold: ";
        sb.append("load:".append(String.format("%.4f", currentLoad.append("; ";
        sb.append("cpuUsage:".append(String.format("%.4f", currentCpuUsage.append("; ";
        sb.append("qps:".append(String.format("%.4f", Constants.ENTRY_NODE.passQps(.append("; ";
        sb.append("rt:".append(String.format("%.4f", Constants.ENTRY_NODE.avgRt(.append("; ";
        sb.append("thread:".append(Constants.ENTRY_NODE.curThreadNum(.append("; ";
        sb.append("success:".append(String.format("%.4f", Constants.ENTRY_NODE.successQps(.append("; ";
        sb.append("minRt:".append(String.format("%.2f", Constants.ENTRY_NODE.minRt(.append("; ";
        sb.append("maxSuccess:".append(String.format("%.2f", Constants.ENTRY_NODE.maxSuccessQps(.append("; ";
        RecordLog.info(sb.toString(;
    }
}


三、京东版最佳实践

3.1 使用方式

Sentinel使用方式本身非常简单,就是一个注解,但是要考虑规则加载和规则持久化的方式,现有的方式有:

•中间件(如:zookepper、nacos、eureka、redis等):Sentinel源码extension包里提供了类似的实现,如下图

插件使用方式:注解+配置

第一步 引入组件

<dependency>
    <groupId>com.jd.ldop.tools</groupId>
    <artifactId>sentinel-tools</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>


第二步 初始化sentinelProcess

支持ducc、本地文件读取、直接写入三种方式规则写入方式

<!-- 基于sentinel的降级、限流、熔断组件 -->
    <bean id="sentinelProcess" class="com.jd.ldop.sentinel.SentinelProcess">
        <property name="ruleResourceWrappers">
            <list>
                <ref bean="degradeRule"/>
            </list>
        </property>
    </bean>

    <!-- 降级或限流规则配置 -->
    <bean id="degradeRule" class="com.jd.ldop.sentinel.dto.RuleResourceWrapper">
        <constructor-arg index="0" value="ducc.degradeRule"/>
        <constructor-arg index="1" value="0"/>
        <constructor-arg index="2" value="0"/>
    </bean>


ducc上配置如下:

第三步 定义资源和关联类型

    @Override
    @SentinelResource(value = "modifyGetWaybillState", fallback = "executeDegrade"
    public ExecutionResult<List<Integer>> execute(@NotNull Model imodel {
        // 业务逻辑处理
    }

    public ExecutionResult<List<Integer>> executeDegrade(@NotNull Model imodel {
        // 降级业务逻辑处理
    }


3.2 应用场景

组件支持任意的业务降级、限流、负载保护

四、Sentinel压测数据

4.1 压测目标

应用机器内存稳定在50%以内

Sentinel降级规则:

timeWindow=180------熔断时间窗口180s

statIntervalMs=60000------统计时长1min

4.2 压测结果

压测分为了两个阶段,分别是组件开启和组件关闭两次,前半部分是组件开启的情况,后半部分是组件关闭的情况

com.alibaba.csp.sentinel.node.metric.MetricNode

4.3 压测结论

总体来说,使用sentinel组件对应用cpu和内存影响不大。

编程笔记 » 基于Sentinel自研组件的系统限流/降级/负载保护最佳实践探索

赞同 (33) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽