coding……
但行好事 莫问前程

服务限流(本地)

前几天一直有个老哥在刷我们的后台接口,其实我们风控是有反扒的,但是貌似那几天反扒失效了==,原因这里就不讲了。当我们发现异常时,单机qps峰值达到了2000多,导致阻塞了很多我们正常的前端请求。我们侦测到之后,在反扒重新启用之前,对服务进行降级,对接口进行限流,虽然反扒服务很快就恢复了,但确实也给我们造成一定的损失。前后我也有参加,在这件事情告一段落后,我重新整理了一下最近了解的服务限流的知识,希望能给大家一些参考。其实限流的思想跟前面文章讲的放重复提交有一定相似的地方,都是对服务进行“加锁”,实现方式也有很多种,比如redis、mysql、甚至计数器都能实现一个锁,达到限流的效果。本文先介绍一下本地服务限流的一种实现,后续会介绍分布式限流的实现。

本地服务限流的算法有很多种,比如令牌桶算法、漏桶算法等。本文基于Google Guava提供的一种令牌桶工具RateLimiter实现本地服务限流,实现比较简单,使用起来也比较方便。对于漏桶算法,有兴趣的同学可以自己了解一下。RateLimiter使用的是令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,达到限流的效果。同时RateLimiter也提供阻塞式限流和非阻塞式限流两种方式,可以满足本地服务限流的绝大多数场景。

1. 项目结构

|   pom.xml
|   springboot-19-local-current-limit.iml
|
+---src
|   +---main
|   |   +---java
|   |   |   \---com
|   |   |       \---zhuoli
|   |   |           \---service
|   |   |               \---springboot
|   |   |                   \---local
|   |   |                       \---current
|   |   |                           \---limit
|   |   |                               |   LocalCurrentLimitApplicationContext.java
|   |   |                               |
|   |   |                               +---annotation
|   |   |                               |       Limit.java
|   |   |                               |
|   |   |                               +---aop
|   |   |                               |       LimitInterceptor.java
|   |   |                               |
|   |   |                               +---common
|   |   |                               |   |   User.java
|   |   |                               |   |
|   |   |                               |   \---enums
|   |   |                               |           LimitType.java
|   |   |                               |
|   |   |                               +---controller
|   |   |                               |       UserController.java
|   |   |                               |
|   |   |                               \---service
|   |   |                                   |   UserControllerService.java
|   |   |                                   |
|   |   |                                   \---impl
|   |   |                                           UserControllerServiceImpl.java
|   |   |
|   |   \---resources
|   \---test
|       \---java
  • Limit.java:自定义注解接口,主要定义限流的一些参数
  • LimitInterceptor.java:切面,拦截@Limit注解请求,对请求进行限流控制

2. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhuoli.service</groupId>
    <artifactId>springboot-19-local-current-limit</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- Spring Boot 启动父依赖 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
    </parent>

    <dependencies>
        <!-- Exclude Spring Boot's Default Logging -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>21.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

    </dependencies>

</project>

3. 自定义注解

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Limit {
    /**
     * 资源的名字
     *
     * @return String
     */
    String name() default "";

    /**
     * 资源的key
     *
     * @return String
     */
    String key() default "";

    /**
     * Key的prefix
     *
     * @return String
     */
    String prefix() default "";

    /**
     * 每秒生成令牌的数目(每秒最大允许服务的次数)
     *
     * @return int
     */
    int count();

    /**
     * 类型
     *
     * @return LimitType
     */
    LimitType limitType() default LimitType.CUSTOMER;

    /**
     * 最大等待时间,单位秒,超过该时间请求不继续等待,拒绝服务
     *
     * @return int
     */
    int expireTime();
}

count用来控制RateLimiter生成令牌的速度,expireTime用来控制请求最大等待时间,如果一个请求预计在expireTime无法获取服务,则直接将服务拒绝,而不用继续等待。假如count为10,即每秒生成10个令牌,expireTime为0,说明所有的请求都不会去等待令牌生成,如果请求到来之时,没有可取令牌则拒绝服务,这时候如果1S内有100个请求到达,那么最终可以获取服务的只有10个左右(测试下来,基本在12个左右)。如果expireTime为1,说明允许有1S等待令牌产生的时间,如果1S内有100个请求到达,那么最终可以获取服务的只有20个左右(因为有1S等待令牌产生的时间,那么这100个请求可获取服务的时间为2S,总共20个令牌)。

4. 切面LimitInterceptor

@Aspect
@Configuration
@Slf4j
@RequiredArgsConstructor
public class LimitInterceptor {

    private static final Map<String, RateLimiter> rateLimiterMap = Maps.newConcurrentMap();

    @Around("execution(public * *(..)) && @annotation(com.zhuoli.service.springboot.local.current.limit.annotation.Limit)")
    public Object interceptor(ProceedingJoinPoint pjp) {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        Method method = signature.getMethod();
        Limit limitAnnotation = method.getAnnotation(Limit.class);
        LimitType limitType = limitAnnotation.limitType();
        String name = limitAnnotation.name();
        String key;
        int limitCount = limitAnnotation.count();
        int expireTime = limitAnnotation.expireTime();
        switch (limitType) {
            case IP:
                key = getIpAddress();
                break;
            case CUSTOMER:
                key = limitAnnotation.key();
                break;
            default:
                key = method.getName().toUpperCase();
        }

        String keyRes = Joiner.on("").join(limitAnnotation.prefix(), key);
        try {
            if (Objects.isNull(rateLimiterMap.get(keyRes))) {
                rateLimiterMap.put(keyRes, RateLimiter.create(limitCount));
            }
            RateLimiter rateLimiter = rateLimiterMap.get(keyRes);

            /*非阻塞式,预计1S内获取不到服务,则不继续等待,直接返回null*/
            if (!rateLimiter.tryAcquire(expireTime, TimeUnit.SECONDS)) {
                log.info("Waiting time will be too long, please try later, now is {}", new Date());
                return null;
                //为了测试时方便看控制台日志,把下面代码注释,实际使用将return null注释,将下面代码释放
                //throw new RuntimeException("can not get service in expireTime,please try later");
            }
            log.info(keyRes + " : 成功 : " + new Date());

            //阻塞式,请求会被阻塞,直到获取令牌,采取执行pjp.proceed()
            //log.info(keyRes + " : 成功 : " + "等待时间 : " + rateLimiter.acquire(1));

            return pjp.proceed();
        } catch (Throwable e) {
            if (e instanceof RuntimeException) {
                throw new RuntimeException(e.getLocalizedMessage());
            }
            throw new RuntimeException("server exception");
        }
    }

    private static final String UNKNOWN = "unknown";

    public String getIpAddress() {
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String ip = request.getHeader("x-forwarded-for");
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getHeader("WL-Proxy-Client-IP");
        }
        if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
            ip = request.getRemoteAddr();
        }
        return ip;
    }
}

给每一个key分配一个RateLimiter,通过RateLimiter控制请求流量。这里简单讲一下acquire和tryAcquire的区别:

  • acquire:获取令牌,如果令牌桶中无令牌可取,则阻塞等待,知道有令牌产生,参数为需要取的令牌数目
  • tryAcquire:指定一个超时时间,一旦判断出在timeout时间内还无法取得令牌,就返回false,这里并没有真正等待timeout时间,非阻塞的

5. 注解使用

@RestController
@RequestMapping("/user")
@AllArgsConstructor
public class UserController {

    private UserControllerService userControllerService;

    @Limit(prefix = "get_user", key = "test", count = 10, expireTime = 2)
    @RequestMapping(value = "/get_user", method = RequestMethod.POST)
    public ResponseEntity getUserById(@RequestParam Long id){
        return ResponseEntity.status(HttpStatus.OK).body(userControllerService.getUserById(id));
    }

    /*用于测试不同的key,限流是否生效*/
    @Limit(prefix = "get_user1", key = "test", count = 5, expireTime = 1)
    @RequestMapping(value = "/get_user1", method = RequestMethod.POST)
    public ResponseEntity getUserById1(@RequestParam Long id){
        return ResponseEntity.status(HttpStatus.OK).body(userControllerService.getUserById(id));
    }
}

7. 测试工具

为了测试,我写了个测试工具,主要是启动线程向上述服务发post请求,并可以进行线程数目等设置。本来打算使用Jmeter测试的,但是使用起来不是很方便,最后选择自己写了一个测试工具,代码在文章最后我会给出,有兴趣的同学可以了解一下。

@Service
@Slf4j
public class PostControllerServiceImpl implements PostControllerService {

    private ExecutorService threadPool = Executors.newCachedThreadPool();

    @Override
    public String postUrl(DoPostRequest doPostRequest) {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("id", doPostRequest.getId().toString()));

        try {
            for (int i = 0; i < doPostRequest.getThreadNum(); i++) {
                Runnable runnable = () -> {
                    try {
                        HttpUtils.postUrl(doPostRequest.getUrl(), params);

                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                };

                for (int j = 0; j < doPostRequest.getPostNum(); j++) {
                    threadPool.execute(runnable);
                }
            }
            return "success";
        } catch (Exception e) {
            log.error("post url error {}", e.getMessage());
            return "fail";
        }
    }
}

8. 测试

8.1 acquire阻塞测试

开启一个线程,向后台服务127.0.0.1 :8080/user/get_user发送100个请求

可以看到10个令牌用完之后,基本就是0.1S执行一次,跟RateLimiter设置的每秒生产10个令牌一致,符合预期

8.2 tryAcquire非阻塞测试

acquire虽然限制了单位时间内请求次数,但是对用户是不友好的,因为需要等待,不能迅速的得到响应。当你有1万个并发请求,一秒只能处理10个,那剩余的用户都会陷入漫长的等待。所以我们需要对应用降级,一旦判断出某些请求是得不到令牌的,就迅速返回失败,避免无谓的等待。下述测试案例中,我的expireTime设置为2S,所以一个请求最大等待令牌的时间为2S,所以1S内同时到达N个请求,RateLimiter生产令牌的总数为30个,最终也只有30个请求能获取服务。

一次发100个post请求


可以看到最终有31个请求获得服务,符合预期。

示例代码:码云 – 卓立 – 服务限流(本地)

码云 – 卓立 – 测试工具

参考链接:

  1. 使用RateLimiter完成简单的大流量限流,抢购秒杀限流
  2. 限流探索

赞(0) 打赏
Zhuoli's Blog » 服务限流(本地)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

zhuoli's blog

联系我关于我

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏