问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

降级、熔断、限流机制详解与实战

创作时间:
作者:
@小白创作中心

降级、熔断、限流机制详解与实战

引用
CSDN
1.
https://blog.csdn.net/Kixuan214/article/details/140960141

在高流量场景下,如何保证系统的稳定性和可靠性?本文将详细介绍微服务架构中的三大利器:降级、熔断和限流机制。通过理论讲解和代码示例,帮助你深入理解这些机制的工作原理和实现方法。

1. 高流量故障原因分析

在高流量场景下,系统可能出现故障的原因主要有两个:

  • 依赖的资源或服务不可用,导致整体服务宕机。例如,在电商系统中,数据库访问缓慢可能导致整体服务不可用。
  • 乐观估计的流量超出系统承载能力,导致系统过载而拒绝服务。

2. 雪崩效应

系统运行需要消耗CPU、内存等资源,也包括线程资源。例如,Tomcat通过线程池处理HTTP请求,当线程资源被耗尽时,服务将无法处理新请求。

雪崩效应示例:

  • A服务调用B服务,B服务又调用C和D服务。其中ABD是核心服务,C是非核心服务。
  • 当A的流量增加时,如果只对核心服务ABD进行扩容而忽略C,C服务可能因为流量过大而响应缓慢。
  • B在调用C时,其线程资源会被阻塞,等待C返回结果。久而久之,B的线程资源会被耗尽,无法处理后续请求。
  • A发往B的请求会被放入B的线程池队列中,导致A调用B的响应时间变长,最终拖垮A服务。
  • 这就是典型的雪崩效应:一个非核心服务的响应延迟导致整个系统崩溃。

解决方案:当检测到某个服务响应时间异常时,切断与该服务的联系,让调用快速返回错误,释放资源。这就是熔断机制。

3. 熔断机制

熔断机制类似于电路中的保险丝,当服务调用失败次数达到阈值时,停止调用并返回错误。熔断器有三种状态:

  • 关闭(正常调用)
  • 半打开(尝试调用)
  • 打开(返回错误)

熔断机制不仅适用于微服务之间的调用,也适用于对Redis、Memcached等资源的调用。

实现要点:

  1. 使用定时器定期检测服务是否恢复。
  2. 在Redis客户端操作数据时加入熔断逻辑。
  3. 当检测到Redis节点故障时,熔断器会实时监测并停止请求该节点,避免单点故障导致系统雪崩。

4. 降级机制

降级机制的目标是将有限的资源效益最大化,通过控制非核心服务的可用性来保证核心服务的稳定性。常见的降级策略包括:

  • 牺牲时效性
  • 返回降级数据(如数据库压力大时只读取缓存)
  • 降频(如增加轮询间隔)
  • 同步写转异步写(牺牲数据一致性保证系统可用性)
  • 牺牲功能完整性(如关闭风控功能)
  • 牺牲用户体验(如禁用列表翻页功能)

5. 限流机制

限流机制通过限制并发请求数量,保证系统能够正常响应部分请求,对于超过限制的流量则拒绝服务。限流策略通常部署在服务的入口层,如API网关。

常见的限流算法包括:

  • 固定窗口算法:统计固定时间窗口内的请求数量,超过限制则触发限流。缺点是无法应对短时间内的突发流量。
  • 滑动窗口算法:将时间窗口划分为多个小窗口,统计滑动时间窗口内的请求数量。解决了固定窗口算法的缺陷,但还是无法限制短时间之内的集中流量。
  • 漏桶算法:通过漏桶机制平滑流量,对突发流量进行缓冲处理。缺点是流量缓存在漏桶中,响应时间增长。
  • 令牌桶算法:在桶中按固定速率加入令牌,请求需要消耗令牌才能被处理。适用于应对突发流量的情况,如Guava库提供了RateLimiter类。缺点是要存储并获取令牌数量,在分布式中用redis存储,每次请求redis都会有延迟,解决办法是使用Lua脚本每次获取一批令牌而不是一个,减少请求redis次数。

限流代码实现 —— 令牌桶限流

实现令牌桶限流算法,需要反复调用Redis查询与计算,一次限流判断需要多次请求较为耗时。因此我们采用编写Lua脚本运行的方式,将运算过程放在Redis端,使得对Redis进行一次请求就能完成限流的判断。

令牌桶算法需要在Redis中存储桶的大小、当前令牌数量,并且实现每隔一段时间添加新的令牌。最简单的办法当然是每隔一段时间请求一次Redis,将存储的令牌数量递增。但实际上我们可以通过对限流两次请求之间的时间和令牌添加速度来计算得出上次请求之后到本次请求时,令牌桶应添加的令牌数量。因此我们在Redis中只需要存储上次请求的时间和令牌桶中的令牌数量,而桶的大小和令牌的添加速度可以通过参数传入实现动态修改。

由于第一次运行脚本时默认令牌桶是满的,因此可以将数据的过期时间设置为令牌桶恢复到满所需的时间,及时释放资源。

编写Lua脚本如下:

local ratelimit_info = redis.pcall('HMGET',KEYS[1],'last_time','current_token')
local last_time = ratelimit_info[1]
local current_token = tonumber(ratelimit_info[2])
local max_token = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local reverse_time = 1000/token_rate
if current_token == nil then
  current_token = max_token
  last_time = current_time
else
  local past_time = current_time-last_time
  local reverse_token = math.floor(past_time/reverse_time)
  current_token = current_token+reverse_token
  last_time = reverse_time*reverse_token+last_time
  if current_token>max_token then
    current_token = max_token
  end
end
local result = 0
if(current_token>0) then
  result = 1
  current_token = current_token-1
end
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
redis.call('pexpire',KEYS[1],math.ceil(reverse_time*(max_token-current_token)+(current_time-last_time)))
return result

使用SpringDataRedis来执行限流:

public class RedisReteLimitScript implements RedisScript<String> {
   private static final String SCRIPT =".Lua";
  @Override   public String getSha1() {
    return DigestUtils.sha1Hex(SCRIPT);
  }
  @Override   public Class<String> getResultType() {     
      return String.class;
  }
  @Override   public String getScriptAsString() {     
      return SCRIPT;
  }
}
// 执行脚本
public boolean rateLimit(String key, int max, int rate) {
    List<String> keyList = new ArrayList<>(1);
    keyList.add(key);
    return "1".equals(stringRedisTemplate.execute(new RedisReteLimitScript(), 
        keyList, Integer.toString(max), Integer.toString(rate),
        Long.toString(System.currentTimeMillis())));
  }

编写测试类:

@Autowired
private RedisManager redisManager;
@Test
public void rateLimitTest() throws InterruptedException {
    String key = "test_rateLimit_key";
    int max = 10;  //令牌桶大小
    int rate = 10; //令牌每秒恢复速度
    AtomicInteger successCount = new AtomicInteger(0);
    Executor executor = Executors.newFixedThreadPool(10);
    CountDownLatch countDownLatch = new CountDownLatch(30);
    for (int i = 0; i < 30; i++) {
      executor.execute(() -> {
        boolean isAllow = redisManager.rateLimit(key, max, rate);
        if (isAllow) {
          successCount.addAndGet(1);
        }
        log.info(Boolean.toString(isAllow));
        countDownLatch.countDown();
      });
    }
    countDownLatch.await();
    log.info("请求成功{}次", successCount.get());
}

日志输出:

[19:12:50,283]true
[19:12:50,284]true
[19:12:50,284]true
[19:12:50,291]true
[19:12:50,291]true
[19:12:50,291]true
[19:12:50,297]true
[19:12:50,297]true
[19:12:50,298]true
[19:12:50,305]true
[19:12:50,305]false
[19:12:50,305]true
[19:12:50,312]false
[19:12:50,312]false
[19:12:50,312]false
[19:12:50,319]false
[19:12:50,319]false
[19:12:50,319]false
[19:12:50,325]false
[19:12:50,325]false
[19:12:50,326]false
[19:12:50,380]false
[19:12:50,380]false
[19:12:50,380]false
[19:12:50,387]false
[19:12:50,387]false
[19:12:50,387]false
[19:12:50,392]false
[19:12:50,392]false
[19:12:50,392]false
[19:12:50,393]请求成功11次

熔断代码实现 —— Redis开关

当熔断器处于Open状态时,定期检测Redis组件是否可用:

new Timer("RedisPort-Recover", true).scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        if (breaker.isOpen()) {
            Jedis jedis = null;
            try {
                jedis = connPool.getResource();
                jedis.ping(); // 验证 redis 是否可用
                successCount.set(0); // 重置连续成功的计数
                breaker.setHalfOpen(); // 设置为半打开态
            } catch (Exception ignored) {
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }
    }
}, 0, recoverInterval); // 初始化定时器定期检测 redis 是否可用

在通过Redis客户端操作Redis数据时,加入熔断器逻辑:

if (breaker.isOpen()) { 
    return null;  // 断路器打开则直接返回空值
}
K value = null;
Jedis jedis = null;
try {
     jedis = connPool.getResource();
     value = callback.call(jedis);
     if(breaker.isHalfOpen()) { // 如果是半打开状态
          if(successCount.incrementAndGet() >= SUCCESS_THRESHOLD) {// 成功次数超过阈值
                failCount.set(0);  // 清空失败数
                breaker.setClose(); // 设置为关闭态
          }
     }
     return value;
} catch (JedisException je) {
     if(breaker.isClose()){  // 如果是关闭态
         if(failCount.incrementAndGet() >= FAILS_THRESHOLD){ // 失败次数超过阈值
            breaker.setOpen();  // 设置为打开态
         }
     } else if(breaker.isHalfOpen()) {  // 如果是半打开态
         breaker.setOpen();    // 直接设置为打开态
     }
     throw  je;
} finally {
     if (jedis != null) {
           jedis.close();
     }
}
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号