Spring Cloud Gateway在分布式场景下的限流及熔断降级应用

文章标题:

Spring Cloud Gateway在分布式场景中的限流与熔断降级应用

文章内容:

各位朋友们,大家好呀!今天我们来一同探讨Spring Cloud Gateway在分布式环境下的限流以及熔断降级相关知识。

一、限流

思考:为何需要限流?

在流量极大的业务场景中,若不进行限流操作,会致使系统出现宕机状况。当大量请求涌向后端服务时,会消耗完各类资源,像CPU、内存、线程、网络带宽以及数据库连接等都是有限的,最终就会拖垮整个系统。

1.常见限流算法
  • 漏桶算法
  • 令牌桶算法
1.1漏桶算法(不推荐)
1.1.1.原理

将请求缓存到一个队列里,然后按照固定的速度对其进行处理,以此达成限流的目的。

1.1.2.实现

把请求放进一个桶中,桶的容量是固定值,当桶被装满后,多余的请求就会被丢弃,桶的底部有个洞,会以固定速率流出请求。

1.1.3.举例

桶的容量为1万,若有10万并发请求,最多只能把1万请求放入桶中,其余请求全部丢弃,然后按照固定速度处理桶内的请求。

1.1.4.缺点

处理突发流量时效率较低,因为处理请求的速度固定,效率不高。

1.2.令牌桶算法(推荐)
1.2.1.原理

把请求放置在一个缓冲队列中,只有拿到令牌后才能进行处理。

1.2.2.实现

装令牌的桶大小固定,当令牌装满后,就无法再放入新的令牌;每次请求都要到桶中获取一个令牌才能放行,没有令牌时,请求要么被丢弃,要么继续放在缓存队列中等待。

1.2.3.举例

桶的容量是10万个,生产速率是1万个/秒,若有10万并发请求,以每秒10万个/秒的速度处理,随着桶中的令牌很快用完,处理速度又会慢慢降下来,而生产令牌的速度保持在1万个/秒左右。

1.2.4.缺点

处理突发流量能提升系统性能,但会给系统带来一定压力,如果桶的大小设置不合理,甚至可能压垮系统,比如处理1亿的并发请求,却把桶的大小设置为1,那系统马上就会崩溃。

2.网关限流(Spring Cloud Gateway + Redis实战)
2.1.pom.xml配置
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-gateway</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-web</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
2.2.yaml配置
spring:
  application:
    name: laokou-gateway
  cloud:
    gateway:
      routes:
        - id: LAOKOU-SSO-DEMO
          uri: lb://laokou-sso-demo
          predicates:
          - Path=/sso/**
          filters:
          - StripPrefix=1
          - name: RequestRateLimiter #请求数限流,名字不能乱打
            args:
              key-resolver: "#{@ipKeyResolver}"
              redis-rate-limiter.replenishRate: 1 #生成令牌速率-设为1方便测试
              redis-rate-limiter.burstCapacity: 1 #令牌桶容量-设置1方便测试
  redis:
    database: 0
    cluster:
      nodes: x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005,x.x.x.x:7003,x.x.x.x:7004,x.x.x.x:7005
    password: laokou #密码
    timeout: 6000ms #连接超时时长(毫秒)
    jedis:
      pool:
        max-active: -1 #连接池最大连接数(使用负值表示无极限)
        max-wait: -1ms #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-idle: 10 #连接池最大空闲连接
        min-idle: 5 #连接池最小空间连接
2.3.创建bean
@Configuration
public class RequestRateLimiterConfig {

    @Bean(value = "ipKeyResolver")
    public KeyResolver ipKeyResolver(RemoteAddressResolver remoteAddressResolver) {
        return exchange -> Mono.just(remoteAddressResolver.resolve(exchange).getAddress().getHostAddress());
    }

    @Bean
    public RemoteAddressResolver remoteAddressResolver() {
        // 远程地址解析器
        return XForwardedRemoteAddressResolver.trustAll();
    }

}
3.测试限流(编写java并发测试)
@Slf4j
public class HttpUtil {
public static void apiConcurrent(String url,Map<String,String> params) {
        Integer count = 200;
        //创建线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.SECONDS, new SynchronousQueue<>());
        //同步工具
        CountDownLatch latch = new CountDownLatch(count);
        Map<String,String> dataMap = new HashMap<>(1);
        dataMap.put("authorize","XXXXXXX");
        for (int i = 0; i < count; i++) {
            pool.execute(() -> {
                try {
                    //访问网关的API接口
                    HttpUtil.doGet("http://localhost:1234/sso/laokou-demo/user",dataMap);
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

public static String doGet(String url, Map<String, String> params) throws IOException {
        //创建HttpClient对象
        CloseableHttpClient httpClient = HttpClients.createDefault();
        String resultString = "";
        CloseableHttpResponse response = null;
        try {
            //创建uri
            URIBuilder builder = new URIBuilder(url);
            if (!params.isEmpty()) {
                for (Map.Entry<String, String> entry : params.entrySet()) {
                    builder.addParameter(entry.getKey(), entry.getValue());
                }
            }
            URI uri = builder.build();
            //创建http GET请求
            HttpGet httpGet = new HttpGet(uri);
            List<NameValuePair> paramList = new ArrayList<>();
            RequestBuilder requestBuilder = RequestBuilder.get().setUri(new URI(url));
            requestBuilder.setEntity(new UrlEncodedFormEntity(paramList, Consts.UTF_8));
            httpGet.setHeader(new BasicHeader("Content-Type", "application/json;charset=UTF-8"));
            httpGet.setHeader(new BasicHeader("Accept", "*/*;charset=utf-8"));
            //执行请求
            response = httpClient.execute(httpGet);
            //判断返回状态是否是200
            if (response.getStatusLine().getStatusCode() == 200) {
                resultString = EntityUtils.toString(response.getEntity(), "UTF-8");
            }
        } catch (Exception e) {
            log.info("调用失败:{}",e);
        } finally {
            if (response != null) {
                response.close();
            }
            httpClient.close();
        }
        log.info("打印:{}",resultString);
        return resultString;
    }
}

说明这个网关限流配置是没有问题的

4.源码查看

Spring Cloud Gateway RequestRateLimiter GatewayFilter
Factory文档地址

工厂 RequestRateLimiter
GatewayFilter
使用一个RateLimiter实现来判断当前请求是否被允许继续。如果不允许,HTTP 429 - Too Many
Requests
则返回默认状态。

4.1.查看 RequestRateLimiterGatewayFilterFactory
    @Override
    public GatewayFilter apply(Config config) {
        KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
        RateLimiter<Object> limiter = getOrDefault(config.rateLimiter, defaultRateLimiter);
        boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
        HttpStatusHolder emptyKeyStatus = HttpStatusHolder
                .parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));

        return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY).flatMap(key -> {
            if (EMPTY_KEY.equals(key)) {
                if (denyEmpty) {
                    setResponseStatus(exchange, emptyKeyStatus);
                    return exchange.getResponse().setComplete();
                }
                return chain.filter(exchange);
            }
            String routeId = config.getRouteId();
            if (routeId == null) {
                Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
                routeId = route.getId();
            }
                     // 执行限流
            return limiter.isAllowed(routeId, key).flatMap(response -> {

                for (Map.Entry<String, String> header : response.getHeaders().entrySet()) {
                    exchange.getResponse().getHeaders().add(header.getKey(), header.getValue());
                }

                if (response.isAllowed()) {
                    return chain.filter(exchange);
                }

                setResponseStatus(exchange, config.getStatusCode());
                return exchange.getResponse().setComplete();
            });
        });
    }
4.2.查看 RedisRateLimiter
    @Override
    @SuppressWarnings("unchecked")
    public Mono<Response> isAllowed(String routeId, String id) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        }
            // 这里如何加载配置?请思考
        Config routeConfig = loadConfiguration(routeId);
            // 令牌桶每秒产生令牌数量
        int replenishRate = routeConfig.getReplenishRate();
            // 令牌桶容量
        int burstCapacity = routeConfig.getBurstCapacity();
            // 请求消耗的令牌数
        int requestedTokens = routeConfig.getRequestedTokens();
        try {
                      // 键
            List<String> keys = getKeys(id);
                      // 参数
            List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", "", requestedTokens + "");
            // 调用lua脚本
            Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
            return flux.onErrorResume(throwable -> {
                if (log.isDebugEnabled()) {
                    log.debug("Error calling rate limiter lua", throwable);
                }
                return Flux.just(Arrays.asList(1L, -1L));
            }).reduce(new ArrayList<Long>(), (longs, l) -> {
                longs.addAll(l);
                return longs;
            }).map(results -> {
                              // 判断是否等于1,1表示允许通过,0表示不允许通过
                boolean allowed = results.get(0) == 1L;
                Long tokensLeft = results.get(1);
                Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
                if (log.isDebugEnabled()) {
                    log.debug("response: " + response);
                }
                return response;
            });
        }
        catch (Exception e) {
            log.error("Error determining if user allowed from redis", e);
        }
        return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
    }

    static List<String> getKeys(String id) {
        String prefix = "request_rate_limiter.{" + id;
        String tokenKey = prefix + "}.tokens";
        String timestampKey = prefix + "}.timestamp";
        return Arrays.asList(tokenKey, timestampKey);
    }

思考:redis限流配置是如何加载?

其实就是监听动态路由的事件并把配置存起来

4.3.重点来了,令牌桶 /META-INF/scripts/request_rate_limiter.lua 脚本剖析
-- User Request Rate Limiter filter
-- See https://stripe.com/blog/rate-limiters
-- See https://gist.github.com/ptarjan/e38f45f2dfe601419ca3af937fff574d#file-1-check_request_rate_limiter-rb-L11-L34

-- 令牌桶算法工作原理
-- 1.系统以恒定速率往桶里面放入令牌
-- 2.请求需要被处理,则需要从桶里面获取一个令牌
-- 3.如果桶里面没有令牌可获取,则可以选择等待或直接拒绝并返回

-- 令牌桶算法工作流程
-- 1.计算填满令牌桶所需要的时间(填充时间 = 桶容量 / 速率)
-- 2.设置存储数据的TTL(过期时间),为填充时间的两倍(存储时间 = 填充时间 * 2)
-- 3.从Redis获取当前令牌的剩余数量和上一次调用的时间戳
-- 4.计算距离上一次调用的时间间隔(时间间隔 = 当前时间 - 上一次调用时间)
-- 5.计算填充的令牌数量(填充令牌数量 = 时间间隔 * 速率)【前提:桶容量是固定的,不存在无限制的填充】
-- 6.判断是否有足够多的令牌满足请求【 (填充令牌数量 + 剩余令牌数量) >= 请求数量 && (填充令牌数量 + 剩余令牌数量) <= 桶容量 】
-- 7.如果请求被允许,则从桶里面取出相应数据的令牌
-- 8.如果TTL为正,则更新Redis键中的令牌和时间戳
-- 9.返回两个两个参数(allowed_num:请求被允许标志。1允许,0不允许)、(new_tokens:填充令牌后剩余的令牌数据)

-- 随机写入
redis.replicate_commands()

-- 令牌桶Key -> 存储当前可用令牌的数量(剩余令牌数量)
local tokens_key = KEYS[1]

-- 时间戳Key -> 存储上次令牌刷新的时间戳
local timestamp_key = KEYS[2]

-- 令牌填充速率
local rate = tonumber(ARGV[1])

-- 令牌桶容量
local capacity = tonumber(ARGV[2])

-- 当前时间
local now = tonumber(ARGV[3])

-- 请求数量
local requested = tonumber(ARGV[4])

-- 填满令牌桶所需要的时间
local fill_time = capacity / rate

-- 设置key的过期时间(填满令牌桶所需时间的2倍)
local ttl = math.floor(fill_time * 2)

-- 判断当前时间,为空则从redis获取
if now == nil then
    now = redis.call('TIME')[1]
end

-- 获取当前令牌的剩余数量
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
    last_tokens = capacity
end

-- 获取上一次调用的时间戳
local last_refreshed = tonumber(redis.call('get', timestamp_key))
if last_refreshed == nil then
    last_refreshed = 0
end

-- 计算距离上一次调用的时间间隔
local delta = math.max(0, now - last_refreshed)

-- 当前的令牌数量(剩余 + 填充 <= 桶容量)
local now_tokens = math.min(capacity, last_refreshed + (rate * delta))

-- 判断是否有足够多的令牌满足请求
local allowed = now_tokens >= requested

-- 定义当前令牌的剩余数量
local new_tokens = now_tokens

-- 定义被允许标志
local allowed_num = 0
if allowed then
    new_tokens = now_tokens - requested
    -- 允许访问
    allowed_num = 1
end

-- ttl > 0,将当前令牌的剩余数量和当前时间戳存入redis
if ttl > 0 then
    redis.call('setex', tokens_key, ttl, new_tokens)
    redis.call('setex', timestamp_key, ttl, now)
end

-- 返回参数
return { allowed_num, new_tokens }
4.4.查看 GatewayRedisAutoConfiguration 脚本初始化

```java
@Bean
@SuppressWarnings("unchecked")
public RedisScript redisRequestRateLimiterScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(
// 根据指定路径获取lua脚本来初始化配置
new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
redisScript.setResultType(List.class);
return redisScript;
}

@Bean
@ConditionalOnMissingBean
public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
        @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/12616.html

(0)
LomuLomu
上一篇 7小时前
下一篇 2025 年 5 月 15 日

相关推荐

  • 最新idea 2024激活码,idea激活教程(附有效idea激活码+激活补丁)

    最新idea 2024激活码,idea激活教程(附有效idea激活码+激活补丁) 本文适用于 IDEA、PyCharm、DataGrip、Goland 等 JetBrains 产品,涵盖了 JetBrains 全家桶的激活方法! 激活成功的效果截图 为了让大家更直观地了解激活效果,首先分享一下成功激活 IDEA 到 2099 年的截图,大家可以看到这款软件已…

    2025 年 4 月 21 日
    2.3K00
  • 用 Cursor 写出第一个程序

    大家好,我是汤师爷 最近几个月,Cursor迅速走红,成为一款强大的编程助手。Cursor不仅使用简单,而且通过集成各种大模型技术,编程能力一流。 Cursor是什么? Cursor是一个类似VSCode的编辑器,集成了GPT-4、Claude 3.5等LLM模型。它本质上是在VSCode的基础上添加了AI辅助编程功能。 从界面布局到操作方式都与VSCode…

    2024 年 12 月 30 日
    26100
  • 永久破解IDEA,IDEA2024最新激活码(免费分享)

    IntelliJ IDEA 是广受欢迎的 Java 集成开发环境,被认为是最好的 Java 开发工具之一。本文将分享如何通过脚本免费激活 IDEA 及 Jetbrains 全家桶工具,支持 2021 及以上版本,包括最新版本。 一、下载并安装 IDEA 首先,前往 JetBrains 官网下载最新版本的 IDEA。安装过程非常简单,按照提示一步步操作即可。 …

    未分类 2024 年 7 月 16 日
    1.6K00
  • DataGrip注册码分享 – 2025年最新破解方法(永久激活至2099年)

    DataGrip介绍 DataGrip是JetBrains公司出品的一款跨平台数据库工具,专为数据库开发人员和管理员设计,支持多种主流数据库系统,如MySQL、PostgreSQL、MongoDB、Oracle和SQL Server等。它拥有智能的SQL编辑器、高效的导航工具和强大的数据库对象管理功能,能够显著提高数据库操作效率。本文将详细介绍如何获取Dat…

    DataGrip破解教程 2025 年 4 月 27 日
    19000
  • A5433 Java+Jsp+Servlet+MySQL+微信小程序+LW+在线点餐小程序的设计与实现 源码 配置 文档

    在线点餐小程序的设计与实现 1.摘要 2.开发目的和意义 2.1 系统开发目的 2.2 系统开发意义 3.系统功能设计 4.系统界面截图 5.源码获取 1.摘要 摘要随着社会节奏的加快,人们对于便捷生活方式的需求日益增长,尤其是忙碌的上班族群体。传统的餐厅就餐方式耗时且不便,而现有的APP点餐服务又无法满足个性化需求。因此,本项目利用Web开发技术和后台数据…

    2024 年 12 月 28 日
    26300

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信