限流组件实现

Rain2024-02-07 23:01:07backendGolang高并发

一、服务流量限制的重要性

随着业务规模的增长,服务的流量也会激增,大流量可能会压垮服务器,导致服务瘫痪。因此需对服务的流量进行限制,确保在大流量的情况下也能正常运行。

当流量激增时,会占用大量服务器资源和带宽,可能会压垮整个系统。比如流量激增期间数据库连接用尽,会导致服务无法访问数据库而宕机。用限制流量可以有效防止流量暴增压垮系统。

没有限流时,流量激增期间会启动很多无用的任务占用服务器资源,造成不必要的浪费。适当限流可以排队或拒绝无效请求,有效控制资源消耗使用。

二、常见的限流算法

限流的基本思想是通过算法预先设置阈值,当流量达到阈值时自动限制,常见的限流算法有:

  1. 计数器算法

计数器算法根据时间窗口内的请求数进行限制,基本思路是设置一个计数器统计时间窗口内的请求数,当请求数达到限流阈值时,就拒绝服务或者排队。

**优点: **实现简单,资源消耗少。

**缺点: **无法处理突发流量,时间窗口比较难确定合适的值。

  1. 漏桶算法

漏桶算法是限制请求通过的速率,基本思路是设置桶的容量和流出速率,如果请求流入速率过大会被桶阻止,根据固定速率流出,起到平滑调节速率的作用。

**优点: **可以很好地应对突发流量。

**缺点: **需要实时处理每一个请求,对系统资源消耗较大。

  1. 令牌桶算法

令牌桶算法按照固定速率向桶中放入令牌,每次请求需要消耗一个令牌,只有拿到令牌的请求才允许通过。放令牌的速率可以通过调节,实现不同速率的请求限流。

**优点: **拥堵控制能力更强,可自定义速率。

**缺点: **需要实时处理请求,系统资源消耗依然比较大。

三、限流实战

  1. 使用计数器限制总请求数

计数器算法主要是基于时间窗口和计数器来进行限制的。下面来看一个基于计数器进行限流的实现:

package limit

import (
    "sync/atomic"
    "time"
)

// 计数器限流器
type CounterLimiter struct {
    count      uint64 // 当前计数
    lastUpdate int64  // 上次更新的时间

    limitPerSec uint64 // 每秒限制的请求数
}

// 创建计数器限流器
func NewCounterLimiter(limitPerSec uint64) *CounterLimiter {
    return &CounterLimiter{
        count:      0,
        lastUpdate: time.Now().Unix(),
        limitPerSec: limitPerSec,
    }
}

// 实现限流器接口的Allow()
func (c *CounterLimiter) Allow() bool {
    now := time.Now().Unix()
    elapse := now - c.lastUpdate

    c.lastUpdate = now
    addedCount := elapse * c.limitPerSec

    c.count += uint64(addedCount)
    if c.count > c.limitPerSec {
        c.count = c.limitPerSec
    }

    if c.count < c.limitPerSec {
        c.count++
        return true
    }

    return false
}

基于计数器的限流器 CounterLimiter,主要逻辑是

创建限流器时指定限流频率 limitPerSec

Allow() 方法校验是否限流

  • 计算当前时间和上次更新时间差 elapsed
  • 根据时间差计算这段时间新增的限流额度 addedCount
  • 计数器统计值增加 addedCount
  • 判断计数器是否达到阈值
  • 到达阈值则限流,未到则计数 +1 并放行

只需要创建一个限流器,并在请求处理前调用 Allow() 方法判断是否限流即可。

func httpHandler(w http.ResponseWriter, r *http.Request) {

  limiter := NewCounterLimiter(10)

  if !limiter.Allow() {

    http.Error(w, http.StatusText(429),
      http.StatusTooManyRequests)

    return
  }

  // 核心逻辑
}
  1. 使用漏桶算法限制请求通过速率

漏桶算法需要设置一个桶的容量 capacity,和漏出流量的速率 flow per second。如果请求流入速度过快,会被桶的容量限制而丢弃。

使用漏桶算法实现请求通过速率限流的示例:

type LeakyBucket struct {
  capacity     int64 // 桶容量
  used         int64 // 当前已使用
  mu           sync.Mutex
  lastLeakTime time.Time // 上次漏水时间

  flow int64 // 每秒流速
}

// 创建漏桶限流器
func NewLeakyBucket(capacity, flow int64) *LeakyBucket {
  return &LeakyBucket{
    capacity: capacity,
    used:     0,
    flow:     flow,
  }
}

// 实现限流器接口的Allow()
func (l *LeakyBucket) Allow() bool {
  l.mu.Lock()
  defer l.mu.Unlock()

  now := time.Now()
  l.leak(now)

  if l.used >= l.capacity {
    return false
  }

  l.used += 1
  return true
}

// 漏水处理
func (l *LeakyBucket) leak(now time.Time) {

  delta := now.Sub(l.lastLeakTime)

  leaked := delta.Seconds() * float64(l.flow)

  // 计算这段间隔内的漏水量
  leakedInt := int64(leaked)
  if leakedInt > (l.used) {
    // 漏出了全部水
    l.used = 0
  } else {
    l.used -= leakedInt
  }

  l.lastLeakTime = now
}

漏桶算法的实现主要分为两个部分:

  • A llow() 方法检查当前使用量是否达到桶容量,未到则请求数 +1 并返回 true,已到则返回 false 限流
  • leak() 方法按照固定流速进行漏水,对应速率进行的限流

可通过设置桶容量和流速,来限制请求通过系统的速率了。

  1. 使用令牌桶算法释放固定数额的令牌

令牌桶算法的主要逻辑是按照一定速率往桶中放入令牌。

请求在处理前需要先获取令牌,如果没有可用令牌则丢弃该请求或进入队列等待。

下面是使用令牌桶算法实现的限流器:

package limit

import (
  "sync"
  "time"
)

// 令牌桶算法限流器
type TokenBucket struct {
  capacity   int64 // 桶容量
  rate       int64 // 令牌放入速率
  tokens     int64 // 当前令牌数
  lastUpdate int64 // 上次添加令牌的时间
  mu         sync.Mutex
}

// 创建令牌桶限流器
func NewTokenBucket(capacity, rate int64) *TokenBucket {
  return &TokenBucket{capacity: capacity, rate: rate, tokens: capacity, lastUpdate: time.Now().Unix()}
}

// Allow 方法实现限流器接口
func (t *TokenBucket) Allow() bool {
  t.addTokens()
  t.mu.Lock()
  defer t.mu.Unlock()
  if t.tokens <= 0 {
    return false
  }
  t.tokens--
  return true
}

// 按速率添加令牌
func (t *TokenBucket) addTokens() {
  now := time.Now().Unix()
  elapse := now - t.lastUpdate
  add := elapse * t.rate
  t.lastUpdate = now
  t.tokens += add
  if t.tokens > t.capacity {
    t.tokens = t.capacity
  }
}

主要逻辑分为两部分:

  • Allow() 方法处理请求前获取令牌
  • addTokens() 方法按照速率往桶中添加令牌

调整桶容量和添加令牌的速率,来达到平滑限流的效果。

  1. 封装限流中间件,便于业务复用

在上例中,实现了通用的限流器接口,包含创建限流器和 Allow() 校验方法。可以基于这个接口,进一步封装成限流中间件。

package limit

import "net/http"

// 中间件实现
func LimitMiddleware(handler http.Handler, limiter Limiter) http.Handler {
  return http.HandlerFunc(func(w http.ResponseWriter,
    r *http.Request) {

    if !limiter.Allow() {

      http.Error(w, http.StatusText(429),
        http.StatusTooManyRequests)

      return
    }

    handler.ServeHTTP(w, r)
  })
}

func main() {
  // 使用方式
  limiter := NewTokenBucket(capacity, rate)
  http.Handle("/", LimitMiddleware(myHandler, limiter))
}

四、优化限流策略

很多时候业务访问量会有周期性波动或突发变化, 需要能够检测实时流量,动态调整限流参数。比如可以根据每分钟的请求量实时调整下一分钟的限流阈值。

根据业务需要可以设置自定义策略,比如对重要接口限流预留更多资源,对次要接口限流更严格等。允许更加细粒度地控制限流。

可以根据服务器负载、平均响应时间等指标,动态决定是否需要限流以保护系统。在流量大幅增长时自动跟进限制。

五、使用 Redis 实现分布式限流

之前的限流方式都是在单机上通过计数或时间实现,存在一定的不足。可使用 Redis 实现分布式限流。

Redis 的性能和扩展性优势:

Redis 单机可以达到 10 万 + QPS 的性能, pipeline 批量操作可以进一步提升这一指标。此外 Redis 很容易通过主从复制和分片来进行扩展。正是得益于这些优势,才使得它非常适合实现分布式限流。

利用 Redis 的计数器和定时任务实现分布式限制 ,主要的思想是:

  • 对每个唯一请求路径维护一个计数器
  • 每次请求计数器 +1
  • 当计数器达到阈值则返回限流
  • 通过定时任务定期重置计数器计数

客户端请求时通过 Lua 脚本 canRequest.lua 来进行判断:

-- canRequest.lua 限流判断脚本
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local current = tonumber(redis.call('get', key) or "0")

if current + 1 > limit then 
    return 0
else
    redis.call("INCRBY", key, 1)
    redis.call("expire", key, 1) 
    return 1
end

使用一个唯一键 key 存储计数器,并设置 key 的过期时间,比如 1 秒。根据 key 的当前计数是否达到阈值来拒绝请求。可轻松通过 Redis 实现分布式限流,并可以横向扩展提高效率。

Last Updated 2/7/2024, 3:14:40 PM