前几天一直有个老哥在刷我们的后台接口,其实我们风控是有反扒的,但是貌似那几天反扒失效了==,原因这里就不讲了。当我们发现异常时,单机qps峰值达到了2000多,导致阻塞了很多我们正常的前端请求。我们侦测到之后,在反扒重新启用之前,对服务进行降级,对接口进行限流,虽然反扒服务很快就恢复了,但确实也给我们造成一定的损失。前后我也有参加,在这件事情告一段落后,我重新整理了一下最近了解的服务限流的知识,希望能给大家一些参考。其实限流的思想跟前面文章讲的放重复提交有一定相似的地方,都是对服务进行“加锁”,实现方式也有很多种,比如redis、mysql、甚至计数器都能实现一个锁,达到限流的效果。本文先介绍一下本地服务限流的一种实现,后续会介绍分布式限流的实现。
本地服务限流的算法有很多种,比如令牌桶算法、漏桶算法等。本文基于Google Guava提供的一种令牌桶工具RateLimiter实现本地服务限流,实现比较简单,使用起来也比较方便。对于漏桶算法,有兴趣的同学可以自己了解一下。RateLimiter使用的是令牌桶的流控算法,RateLimiter会按照一定的频率往桶里扔令牌,线程拿到令牌才能执行,达到限流的效果。同时RateLimiter也提供阻塞式限流和非阻塞式限流两种方式,可以满足本地服务限流的绝大多数场景。
1. 项目结构
| pom.xml
| springboot-19-local-current-limit.iml
|
+---src
| +---main
| | +---java
| | | \---com
| | | \---zhuoli
| | | \---service
| | | \---springboot
| | | \---local
| | | \---current
| | | \---limit
| | | | LocalCurrentLimitApplicationContext.java
| | | |
| | | +---annotation
| | | | Limit.java
| | | |
| | | +---aop
| | | | LimitInterceptor.java
| | | |
| | | +---common
| | | | | User.java
| | | | |
| | | | \---enums
| | | | LimitType.java
| | | |
| | | +---controller
| | | | UserController.java
| | | |
| | | \---service
| | | | UserControllerService.java
| | | |
| | | \---impl
| | | UserControllerServiceImpl.java
| | |
| | \---resources
| \---test
| \---java
- Limit.java:自定义注解接口,主要定义限流的一些参数
- LimitInterceptor.java:切面,拦截@Limit注解请求,对请求进行限流控制
2. pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhuoli.service</groupId>
<artifactId>springboot-19-local-current-limit</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- Spring Boot 启动父依赖 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
<dependencies>
<!-- Exclude Spring Boot's Default Logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
</project>
3. 自定义注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Limit {
/**
* 资源的名字
*
* @return String
*/
String name() default "";
/**
* 资源的key
*
* @return String
*/
String key() default "";
/**
* Key的prefix
*
* @return String
*/
String prefix() default "";
/**
* 每秒生成令牌的数目(每秒最大允许服务的次数)
*
* @return int
*/
int count();
/**
* 类型
*
* @return LimitType
*/
LimitType limitType() default LimitType.CUSTOMER;
/**
* 最大等待时间,单位秒,超过该时间请求不继续等待,拒绝服务
*
* @return int
*/
int expireTime();
}
count用来控制RateLimiter生成令牌的速度,expireTime用来控制请求最大等待时间,如果一个请求预计在expireTime无法获取服务,则直接将服务拒绝,而不用继续等待。假如count为10,即每秒生成10个令牌,expireTime为0,说明所有的请求都不会去等待令牌生成,如果请求到来之时,没有可取令牌则拒绝服务,这时候如果1S内有100个请求到达,那么最终可以获取服务的只有10个左右(测试下来,基本在12个左右)。如果expireTime为1,说明允许有1S等待令牌产生的时间,如果1S内有100个请求到达,那么最终可以获取服务的只有20个左右(因为有1S等待令牌产生的时间,那么这100个请求可获取服务的时间为2S,总共20个令牌)。
4. 切面LimitInterceptor
@Aspect
@Configuration
@Slf4j
@RequiredArgsConstructor
public class LimitInterceptor {
private static final Map<String, RateLimiter> rateLimiterMap = Maps.newConcurrentMap();
@Around("execution(public * *(..)) && @annotation(com.zhuoli.service.springboot.local.current.limit.annotation.Limit)")
public Object interceptor(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
Limit limitAnnotation = method.getAnnotation(Limit.class);
LimitType limitType = limitAnnotation.limitType();
String name = limitAnnotation.name();
String key;
int limitCount = limitAnnotation.count();
int expireTime = limitAnnotation.expireTime();
switch (limitType) {
case IP:
key = getIpAddress();
break;
case CUSTOMER:
key = limitAnnotation.key();
break;
default:
key = method.getName().toUpperCase();
}
String keyRes = Joiner.on("").join(limitAnnotation.prefix(), key);
try {
if (Objects.isNull(rateLimiterMap.get(keyRes))) {
rateLimiterMap.put(keyRes, RateLimiter.create(limitCount));
}
RateLimiter rateLimiter = rateLimiterMap.get(keyRes);
/*非阻塞式,预计1S内获取不到服务,则不继续等待,直接返回null*/
if (!rateLimiter.tryAcquire(expireTime, TimeUnit.SECONDS)) {
log.info("Waiting time will be too long, please try later, now is {}", new Date());
return null;
//为了测试时方便看控制台日志,把下面代码注释,实际使用将return null注释,将下面代码释放
//throw new RuntimeException("can not get service in expireTime,please try later");
}
log.info(keyRes + " : 成功 : " + new Date());
//阻塞式,请求会被阻塞,直到获取令牌,采取执行pjp.proceed()
//log.info(keyRes + " : 成功 : " + "等待时间 : " + rateLimiter.acquire(1));
return pjp.proceed();
} catch (Throwable e) {
if (e instanceof RuntimeException) {
throw new RuntimeException(e.getLocalizedMessage());
}
throw new RuntimeException("server exception");
}
}
private static final String UNKNOWN = "unknown";
public String getIpAddress() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
}
给每一个key分配一个RateLimiter,通过RateLimiter控制请求流量。这里简单讲一下acquire和tryAcquire的区别:
- acquire:获取令牌,如果令牌桶中无令牌可取,则阻塞等待,知道有令牌产生,参数为需要取的令牌数目
- tryAcquire:指定一个超时时间,一旦判断出在timeout时间内还无法取得令牌,就返回false,这里并没有真正等待timeout时间,非阻塞的
5. 注解使用
@RestController
@RequestMapping("/user")
@AllArgsConstructor
public class UserController {
private UserControllerService userControllerService;
@Limit(prefix = "get_user", key = "test", count = 10, expireTime = 2)
@RequestMapping(value = "/get_user", method = RequestMethod.POST)
public ResponseEntity getUserById(@RequestParam Long id){
return ResponseEntity.status(HttpStatus.OK).body(userControllerService.getUserById(id));
}
/*用于测试不同的key,限流是否生效*/
@Limit(prefix = "get_user1", key = "test", count = 5, expireTime = 1)
@RequestMapping(value = "/get_user1", method = RequestMethod.POST)
public ResponseEntity getUserById1(@RequestParam Long id){
return ResponseEntity.status(HttpStatus.OK).body(userControllerService.getUserById(id));
}
}
7. 测试工具
为了测试,我写了个测试工具,主要是启动线程向上述服务发post请求,并可以进行线程数目等设置。本来打算使用Jmeter测试的,但是使用起来不是很方便,最后选择自己写了一个测试工具,代码在文章最后我会给出,有兴趣的同学可以了解一下。
@Service
@Slf4j
public class PostControllerServiceImpl implements PostControllerService {
private ExecutorService threadPool = Executors.newCachedThreadPool();
@Override
public String postUrl(DoPostRequest doPostRequest) {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("id", doPostRequest.getId().toString()));
try {
for (int i = 0; i < doPostRequest.getThreadNum(); i++) {
Runnable runnable = () -> {
try {
HttpUtils.postUrl(doPostRequest.getUrl(), params);
} catch (Exception e) {
e.printStackTrace();
}
};
for (int j = 0; j < doPostRequest.getPostNum(); j++) {
threadPool.execute(runnable);
}
}
return "success";
} catch (Exception e) {
log.error("post url error {}", e.getMessage());
return "fail";
}
}
}
8. 测试
8.1 acquire阻塞测试
开启一个线程,向后台服务127.0.0.1 :8080/user/get_user发送100个请求
可以看到10个令牌用完之后,基本就是0.1S执行一次,跟RateLimiter设置的每秒生产10个令牌一致,符合预期
8.2 tryAcquire非阻塞测试
acquire虽然限制了单位时间内请求次数,但是对用户是不友好的,因为需要等待,不能迅速的得到响应。当你有1万个并发请求,一秒只能处理10个,那剩余的用户都会陷入漫长的等待。所以我们需要对应用降级,一旦判断出某些请求是得不到令牌的,就迅速返回失败,避免无谓的等待。下述测试案例中,我的expireTime设置为2S,所以一个请求最大等待令牌的时间为2S,所以1S内同时到达N个请求,RateLimiter生产令牌的总数为30个,最终也只有30个请求能获取服务。
一次发100个post请求
示例代码:码云 – 卓立 – 服务限流(本地)
参考链接: