UseRateLimiter尝鲜
全局限流并发1个
app.UseRateLimiter(new RateLimiterOptions
{
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1;
}
};
根据不同资源不同限制并发数,前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。
app.UseRateLimiter(new RateLimiterOptions(
{
// 触发限流的响应码
DefaultRejectionStatusCode = 500,
OnRejected = async (ctx, rateLimitLease =>
{
// 触发限流回调处理
},
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
if (resource.Request.Path.StartsWithSegments("/api"
{
return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",
_ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2;
}
else
{
return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1;
}
}
};
本地测试
新建一个webapi项目,并注册限流中间件如下
using Microsoft.AspNetCore.RateLimiting;
using System.Threading.RateLimiting;
var builder = WebApplication.CreateBuilder(args;
// Add services to the container.
builder.Services.AddControllers(;
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer(;
builder.Services.AddSwaggerGen(;
var app = builder.Build(;
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment(
{
app.UseSwagger(;
app.UseSwaggerUI(;
}
app.UseRateLimiter(new RateLimiterOptions
{
DefaultRejectionStatusCode = 500,
OnRejected = async (ctx, lease =>
{
await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter";
},
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1;
}
};
app.UseHttpsRedirection(;
app.UseAuthorization(;
app.MapControllers(;
app.Run(;
启动项目,使用jmeter测试100并发,请求接口
这个结果是不是有点失望,其实创建的限流器是
,后续可以实现个各种策略的限流器进行替换之。
看了的实现,其实就是的限流思想,上面配置的,第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。
3. 测试触发失败场景
只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。
调整jmater并发数为10
ConcurrencyLimiter源码
获取令牌
protected override RateLimitLease AcquireCore(int permitCount
{
// These amounts of resources can never be acquired
if (permitCount > _options.PermitLimit
{
throw new ArgumentOutOfRangeException(nameof(permitCount, permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit;
}
ThrowIfDisposed(;
// Return SuccessfulLease or FailedLease to indicate limiter state
if (permitCount == 0
{
return _permitCount > 0 ? SuccessfulLease : FailedLease;
}
// Perf: Check SemaphoreSlim implementation instead of locking
if (_permitCount >= permitCount
{
lock (Lock
{
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease
{
return lease;
}
}
}
return FailedLease;
}
尝试获取令牌核心逻辑
private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true] out RateLimitLease? lease
{
ThrowIfDisposed(;
// if permitCount is 0 we want to queue it if there are no available permits
if (_permitCount >= permitCount && _permitCount != 0
{
if (permitCount == 0
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
lease = SuccessfulLease;
return true;
}
// a. if there are no items queued we can lease
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst
{
_idleSince = null;
_permitCount -= permitCount;
Debug.Assert(_permitCount >= 0;
lease = new ConcurrencyLease(true, this, permitCount;
return true;
}
}
lease = null;
return false;
}
令牌获取失败后进入等待队列
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default
{
// These amounts of resources can never be acquired
if (permitCount > _options.PermitLimit
{
throw new ArgumentOutOfRangeException(nameof(permitCount, permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit;
}
// Return SuccessfulLease if requestedCount is 0 and resources are available
if (permitCount == 0 && _permitCount > 0 && !_disposed
{
return new ValueTask<RateLimitLease>(SuccessfulLease;
}
// Perf: Check SemaphoreSlim implementation instead of locking
lock (Lock
{
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease
{
return new ValueTask<RateLimitLease>(lease;
}
// Avoid integer overflow by using subtraction instead of addition
Debug.Assert(_options.QueueLimit >= _queueCount;
if (_options.QueueLimit - _queueCount < permitCount
{
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit
{
// remove oldest items from queue until there is space for the newest request
do
{
RequestRegistration oldestRequest = _queue.DequeueHead(;
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0;
if (!oldestRequest.Tcs.TrySetResult(FailedLease
{
// Updating queue count is handled by the cancellation code
_queueCount += oldestRequest.Count;
}
}
while (_options.QueueLimit - _queueCount < permitCount;
}
else
{
// Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
return new ValueTask<RateLimitLease>(QueueLimitLease;
}
}
CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken;
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueStateobj!.TrySetCanceled(;
}, tcs;
}
RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr;
_queue.EnqueueTail(request;
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit;
return new ValueTask<RateLimitLease>(request.Tcs.Task;
}
}
归还令牌
private void Release(int releaseCount
{
lock (Lock
{
if (_disposed
{
return;
}
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit;
while (_queue.Count > 0
{
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead(
: _queue.PeekTail(;
if (_permitCount >= nextPendingRequest.Count
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead(
: _queue.DequeueTail(;
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_permitCount >= 0;
ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count;
// Check if request was canceled
if (!nextPendingRequest.Tcs.TrySetResult(lease
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
// Updating queue count is handled by the cancellation code
_queueCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose(;
Debug.Assert(_queueCount >= 0;
}
else
{
break;
}
}
if (_permitCount == _options.PermitLimit
{
Debug.Assert(_idleSince is null;
Debug.Assert(_queueCount == 0;
_idleSince = Stopwatch.GetTimestamp(;
}
}
}
总结
虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善。