限速神器RateLimiter源码解析

科技资讯 投稿 6300 0 评论

限速神器RateLimiter源码解析

作者:京东科技 李玉亮

目录指引

限流场景

场景一、高并发的用户端场景。 尤其是C端系统,经常面对海量用户请求,如不做限流,遇到瞬间高并发的场景,则可能压垮系统。

场景二、内部交易处理场景。 如某类交易任务处理时有速率要求,再如上下游调用时下游对上游有速率要求。

算法一、信号量算法。 维护最大的并发请求数(如连接数),当并发请求数达到阈值时报错或等待,如线程池。

算法二、漏桶算法。 模拟一个按固定速率漏出的桶,当流入的请求量大于桶的容量时溢出。

算法三、令牌桶算法。 以固定速率向桶内发放令牌。请求处理时,先从桶里获取令牌,只服务有令牌的请求。

使用介绍

RateLimiter使用时只需引入guava jar便可,最新的版本是31.1-jre, 本文介绍的源码也是此版本。

            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>31.1-jre</version>
            </dependency>

源码中提供了两个直观的使用示例。

 final RateLimiter rateLimiter = RateLimiter.create(2.0; // 创建一个每秒2个许可的RateLimiter对象.
 void submitTasks(List<Runnable> tasks, Executor executor {
   for (Runnable task : tasks {
     rateLimiter.acquire(; // 此处可能有等待
     executor.execute(task;
   }
 }

示例二、以不超过5kb/s的速率产生数据流。

 final RateLimiter rateLimiter = RateLimiter.create(5000.0; // 创建一个每秒5k个许可的RateLimiter对象
 void submitPacket(byte[] packet {
   rateLimiter.acquire(packet.length;
   networkService.send(packet;
 }

可以看出RateLimiter的使用非常简单,只需要构造限速器,调用获取许可方法便可,不需要释放许可。

算法介绍

许可( permit 代表一个令牌,获取到许可的请求才能放行。

资源利用不足( underunilization : 许可的发放一般是匀速的,但请求未必是匀速的,有时会有无请求(资源利用不足)的场景,令牌桶会有贮存机制。

贮存许可( storedPermit : 令牌桶支持对空闲资源进行许可贮存,许可请求时优先使用贮存许可。

新鲜许可( freshPermit : 当贮存许可为空时,采用透支方式,下发新鲜许可,同时设置下次许可生效时间为本次新鲜许可的结束时间。

RateLimiter的核心功能是限速,我们首先想到的限速方案是记住最后一次下发令牌许可(permit时间,下次许可请求时,如果与最后一次下发许可时间的间隔小于1/QPS,则进行sleep至1/QPS,否则直接发放,但该方法不能感知到资源利用不足的场景。一方面,隔了很长一段再来请求许可,则可能系统此时相对空闲,可下发更多的许可以充分利用资源;另一方面,隔了很长一段时间再来请求许可,也可能意味着处理请求的资源变冷(如缓存失效),处理效率会下降。因此在RateLimiter中,增加了资源利用不足(underutilization)的管理,在代码中体现为贮存许可(storedPermits,贮存许可值最开始为0,随着时间的增加,一直增长为最大贮存许可数。许可获取时,首先从贮存许可中获取,然后再根据下次新鲜许可获取时间来进行新鲜许可获取。这里要说的是RateLimiter是记住了下次令牌发放的时间,类似于透支的功能,当前许可获取时立刻返回,同时记录下次获取许可的时间。

代码结构和主体流程

代码结构

RateLimiter类

SmoothRateLimiter类

SmoothBursty类

SmoothWarmingUp类

主体流程

获取许可的主体流程如下:

SmoothBursty算法

限速器创建

采用的是工厂模式创建,源码如下:

  public static RateLimiter create(double permitsPerSecond {
    // permitsPerSecond指每秒允许的许可数. 该方法调用了下面的方法
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer(;
  }
  // 创建SmoothBursty(固定贮存1s的贮存许可, 然后设置速率
  static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */;
    rateLimiter.setRate(permitsPerSecond;
    return rateLimiter;
  }

1、SmoothBursty的构造方法相对简单:

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds {
      super(stopwatch;
      this.maxBurstSeconds = maxBurstSeconds;
    }

2、rateLimiter.setRate的定义在父类RateLimiter中

  public final void setRate(double permitsPerSecond {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond, "rate must be positive";
    synchronized (mutex( {
      doSetRate(permitsPerSecond, stopwatch.readMicros(;
    }
  }

该方法使用synchronized(mutex(方法对互斥锁进行同步,以保证多线程调用的安全,然后调用子类的doSetRate方法。 第二个参数nowMicros传的值是调用了stopwatch的方法,将限速器创建的时间定义为0,然后计算了当前时间和创建时间的时间差,因此采用的是相对时间。

  // Can't be initialized in the constructor because mocks don't call the constructor.
  // 从上行注释可看出,这是因为mock才用了懒加载, 实际上即时加载代码更简洁
  @CheckForNull private volatile Object mutexDoNotUseDirectly;
  // 双重检查锁的懒加载模式
  private Object mutex( {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null {
      synchronized (this {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null {
          mutexDoNotUseDirectly = mutex = new Object(;
        }
      }
    }
    return mutex;
  }

该方法使用了双重检查锁来对锁对象mutexDoNotUseDirectly进行懒加载,另外该方法通过mutex临时变量来解决了双重检查锁失效的问题。

  final void doSetRate(double permitsPerSecond, long nowMicros {
    // 同步贮存许可和时间
    resync(nowMicros;
    double stableIntervalMicros = SECONDS.toMicros(1L / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros;
  }

该方法在限速器创建时会调用,创建后调用限速器的setRate重置速率时也会调用。

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros {
      double newPermits = (nowMicros - nextFreeTicketMicros / coolDownIntervalMicros(;
      storedPermits = min(maxPermits, storedPermits + newPermits;
      nextFreeTicketMicros = nowMicros;
    }
  }

该方法从现实场景上讲,代表的是随着时间的流逝,贮存许可不断增加,但从技术实现的角度,并不是真正的持续刷新,而是仅在需要时调用刷新。该方法如果当前时间小于等于下次许可时间,则贮存许可数量和下次许可时间不需要刷新;否则通过 (当前时间-下次许可时间/贮存许可的发放间隔计算出的值域最大贮存数量取小,则为已贮存的许可数量,需要注意的是贮存许可数量是double类型的。

限速器使用

先说一下accquire方法,共有两个共有方法,一个是无参的,每次获取1个许可,再一个是整数参数的,每次调用获取多个许可。

  // 获取1个许可
  public double acquire( {
    return acquire(1;
  }
 
  // 获取多个许可
  public double acquire(int permits {
    // 留出permits个许可,得到需要sleep的微秒数.
    long microsToWait = reserve(permits;
    // 该方法如果小于等于零则直接返回,否则sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait;
    // 返回休眠的秒数.
    return 1.0 * microsToWait / SECONDS.toMicros(1L;
  }

从以上源码可看出,获取许可的逻辑很简单:留出permits个许可,根据返回值决定是否sleep等待。留出许可的方法实现如下:

 // 预留出permits个许可
  final long reserve(int permits {
    checkPermits(permits;
    synchronized (mutex( {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros(;
    }
  }
   
  // 预留出permits个需求,得到需要等待的时间
  final long reserveAndGetWaitLength(int permits, long nowMicros {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros;
    return max(momentAvailable - nowMicros, 0;
  }
  abstract long reserveEarliestAvailable(int permits, long nowMicros;

reserveEarliestAvailable为抽象方法,实现在SmoothRateLimiter类中,该方法是核心主链路方法,该方法先从贮存许可中获取,如果数量足够则直接返回,否则先将全部贮存许可取出,再计算还需要的等待时间,逻辑如下:

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros {
    // 刷新贮存许可和下个令牌时间
    resync(nowMicros;
    // 返回值为当前的下次空闲时间
    long returnValue = nextFreeTicketMicros;
    // 要消耗的贮存数量为需要的贮存数量
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits;
    // 新鲜许可数=需要的许可数-使用的贮存许可
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 等待时间=贮存许可等待时间(实现方决定+新鲜许可等待时间(数量*固定速率
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend
            + (long (freshPermits * stableIntervalMicros;
    // 透支后的下次许可可用时间=当前时间(nextFreeTicketMicros+等待时间(waitMicros
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros;
    // 贮存许可数量减少
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

该方法有两点说明:1、returnValue为之前计算的下次空闲时间(前面有说RateLimiter采用预支的模式,本次直接返回,同时计算下次的最早空闲时间) 2、贮存许可的等待时间不同的实现方逻辑不同,SmoothBursty算法认为贮存许可直接可用,所以返回0, 后面的SmoothWarmingUp算法认为贮存许可需要消耗比正常速率更多的预热时间,有一定算法逻辑。

  public boolean tryAcquire(Duration timeout {
    return tryAcquire(1, toNanosSaturated(timeout, TimeUnit.NANOSECONDS;
  }

  public boolean tryAcquire(long timeout, TimeUnit unit {
    return tryAcquire(1, timeout, unit;
  }

  public boolean tryAcquire(int permits {
    return tryAcquire(permits, 0, MICROSECONDS;
  }

  public boolean tryAcquire( {
    return tryAcquire(1, 0, MICROSECONDS;
  }

  public boolean tryAcquire(int permits, Duration timeout {
    return tryAcquire(permits, toNanosSaturated(timeout, TimeUnit.NANOSECONDS;
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit {
    long timeoutMicros = max(unit.toMicros(timeout, 0;
    checkPermits(permits;
    long microsToWait;
    synchronized (mutex( {
      long nowMicros = stopwatch.readMicros(;
      // 判断超时微秒数是否可等到下个许可时间
      if (!canAcquire(nowMicros, timeoutMicros {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros;
      }
    }
    // 休眠等待
    stopwatch.sleepMicrosUninterruptibly(microsToWait;
    return true;
  }
  
  // 下次许可时间-超时时间<=当前时间
  private boolean canAcquire(long nowMicros, long timeoutMicros {
    return queryEarliestAvailable(nowMicros - timeoutMicros <= nowMicros;
  }

SmoothWarmingUp算法

SmoothWarmingUp算法的主体处理流程同SmoothBurstry算法,主要在贮存许可时间计算上的两个方法进行了新实现,该算法不像SmoothBurstry算法那么直观好理解,需要先了解算法逻辑,再看源码。

算法说明

说到该算法前,我们再回头看一下SmoothRateLimiter的贮存许可,贮存许可有当前数量和最大数量,另外还有两个算法逻辑,一个是贮存许可生产的速率控制,再一个是贮存许可消费速率的控制,在Bursty算法中,生产的速率同设定的固定速率,而消费的速率为无穷大(立刻消费,不占用时间);在WarmingUp算法中,需对照下图进行分析:

在限速器初始化时,输入的变量有固定速率和预热时间,另外冷却因子是固定值3;在作者算法中,首先计算的是阈值许可数 = 0.5 * 预热周期 / 固定速率. 然后计算的是最大许可数,我们知道了梯形的面积、上边(大速率、下边(小速率,便能推到出高,最大许可=阀值许可数 + 高。

void doSetRate(double permitsPerSecond, double stableIntervalMicros {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros;
      slope = (coldIntervalMicros - stableIntervalMicros / (maxPermits - thresholdPermits;
      if (oldMaxPermits == Double.POSITIVE_INFINITY {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

在具体使用中,一个是生产的速率,固定为预热时间/最大许可数,源码如下:

  double coolDownIntervalMicros( {
      return warmupPeriodMicros / maxPermits;
    }

再一个是消费的速率,按如上曲线从右至左的面积=梯形面积+长方形面积,梯形面积=(上边+下边 /2 * 高,源码如下:

    long storedPermitsToWaitTime(double storedPermits, double permitsToTake {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line
      if (availablePermitsAboveThreshold > 0.0 {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake;
        // TODO(cpovirk: Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake;
        micros = (long (permitsAboveThresholdToTake * length / 2.0;
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line
      micros += (long (stableIntervalMicros * permitsToTake;
      return micros;
    }

源码分析

了解了以上算法后,再看下面的源码就相对简单了。

  static final class SmoothWarmingUp extends SmoothRateLimiter {
    // 预热时间
    private final long warmupPeriodMicros;
    //斜率
    private double slope;
    //阈值许可
    private double thresholdPermits;
    //冷却因子
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor {
      super(stopwatch;
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod;
      this.coldFactor = coldFactor;
    }
    
    // 参数初始化
    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros;
      slope = (coldIntervalMicros - stableIntervalMicros / (maxPermits - thresholdPermits;
      if (oldMaxPermits == Double.POSITIVE_INFINITY {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    // 有storedPermits个贮存许可,要使用permitsToTake个时的等待时间计算
    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line
      if (availablePermitsAboveThreshold > 0.0 {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake;
        // TODO(cpovirk: Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake;
        micros = (long (permitsAboveThresholdToTake * length / 2.0;
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line
      micros += (long (stableIntervalMicros * permitsToTake;
      return micros;
    }
    // 许可耗时=固定速率+许可值*斜率
    private double permitsToTime(double permits {
      return stableIntervalMicros + permits * slope;
    }
    // 冷却间隔固定为预热时间/最大许可数.
    @Override
    double coolDownIntervalMicros( {
      return warmupPeriodMicros / maxPermits;
    }
  }

思考总结

sleep说明和相对时间

RateLimiter内部使用类StopWatch进行了一个相对时间的度量,RateLimiter创建时,时间为0,然后向后累计,sleep时不受interrupt异常影响。

double浮点数

只支持单机

RateLimiter的这几种算法只支持单机限流,如要支持集群限流,一种方式是先根据负载均衡的权重计算出单机的限速值,再进行单节点限速;另一种方式是参考该组件使用redis等中心化数量管理的中间件,但性能和稳定性会降低一些。

扩展性

RateLimiter提供了有限的扩展能力,自带的SmoothBursty和SmoothWarmingUp类不是公开类,不能直接创建或调整参数,如关闭贮存功能或调整预热系数等。这种场景需要继承SmoothRateLimiter进行重写,贮存许可的生产和消费算法是容易变化和重写的点,将整个源码拷贝出来进行二次修改也是一种方案。

编程笔记 » 限速神器RateLimiter源码解析

赞同 (39) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽