2024年7月

一、介绍

在企业发展初期,使用的后台管理系统还比较少,一个或者两个。

以电商系统为例,在起步阶段,可能只有一个商城下单系统和一个后端管理产品和库存的系统。

随着业务量越来越大,此时的业务系统会越来越复杂,项目会划分成多个组,每个组负责各自的领域,例如:A组负责商城系统的开发,B组负责支付系统的开发,C组负责库存系统的开发,D组负责物流跟踪系统的开发,E组负责每日业绩报表统计的开发...等等。

规模变大的同时,人员也会逐渐的增多,以研发部来说,大致的人员就有这么几大类:研发人员、测试人员、运维人员、产品经理、技术支持等等。

他们会频繁的登录各自的后端业务系统,然后进行办公。

此时,我们可以设想一下,如果每个组都自己开发一套后端管理系统的登录,假如有10个这样的系统,同时一个新入职的同事需要每个系统都给他开放一个权限,那么我们可能需要给他开通10个账号。

随着业务规模的扩大,大点的公司,可能高达一百多个业务系统,那岂不是要配置一百多个账号,让人去做这种操作,岂不伤天害理。

面对这种繁琐而且又无效的工作,IT大佬们想到一个办法,那就是开发一套登录系统,所有的业务系统都认可这套登录系统,那么就可以实现只需要登录一次,就可以访问其他相互信任的应用系统。

这个登录系统,我们把它称为:单点登录系统。

好了,言归正传,下面我们从两个方面来介绍单点登录系统的实现。

  • 方案设计
  • 项目实践

二、方案设计

2.1、单体后端系统登录

在传统的单体后端系统中,简单点的操作,我们一般都会这么玩,用户使用账号、密码登录之后,服务器会给当前用户创建一个
session
会话,同时也会生成一个
cookie
,最后返回给前端。

当用户访问其他后端的服务时,我们只需要检查一下当前用户的
session
是否有效,如果无效,就再次跳转到登录页面;如果有效,就进入业务处理流程。

但是,如果访问不同的域名系统时,这个cookie是无效的,因此不能跨系统访问,同时也不支持集群环境的共享。

对于单点登录的场景,我们需要重新设计一套新的方案。

2.2、单点登录系统登录

先来一张图!

这个流程图,就是单点登录系统与应用系统之间的交互图。

当用户登录某应用系统时,应用系统会把将客户端传入的token,调用单点登录系统验证token合法性接口,如果不合法就会跳转到单点登录系统的登录页面;如果合法,就直接进入首页。

进入登录页面之后,会让用户输入用户名、密码进行登录验证,如果验证成功之后,会返回一个有效的token,然后客户端会根据服务端返回的参数链接,跳转回之前要访问的应用系统。

接着,应用系统会再次验证token的合法性,如果合法,就进入首页,流程结束。

引入单点登录系统后,接入的应用系统不需要关系用户登录这块,只需要对客户端的token做一下合法性鉴权操作就可以了。

而单点登录系统,只需要做好用户的登录流程和鉴权并返回安全的token给客户端。

有的项目,会将生成的token,存放在客户端的cookie中,这样做的目的,就是避免每次调用接口的时候都在url里面带上token。

但是,浏览器只允许同域名下的cookies可以共享,对于不同的域名系统, cookie 是无法共享的。

对于这种情况,我们可以先将 token 放入到url链接中,类似上面流程图中跳转思路,对于同一个应用系统,我们可以将token放入到 cookie 中,不同的应用系统,我们可以通过 url 链接进行传递,实现token的传输。

三、项目实践

在实践上,token的存储,有两种方案:

  • 存放在服务器,如果是分布式环境,一般都会存储在 redis 中
  • 存储在客户端,服务器做验证,天然支持分布式

3.1、存放在redis

存放在redis中,是一种比较常见的处理办法,最开始的时候也是这种处理办法。

当用户登录成功之后,会将用户的信息作为value,用uuid作为key,存储到redis中,各个服务集群共享用户信息。

代码实践也非常简单。

用户登录之后,将用户信息存在到redis,同时返回一个有效的token给客户端。

@RequestMapping(value = "/login", method = RequestMethod.POST, produces = {"application/json;charset=UTF-8"})
public TokenVO login(@RequestBody LoginDTO loginDTO){
    //...参数合法性验证
    //从数据库获取用户信息
    User dbUser = userService.selectByUserNo(loginDTO.getUserNo);
    //....用户、密码验证

    //创建token
    String token = UUID.randomUUID();
    //将token和用户信息存储到redis,并设置有效期2个小时
    redisUtil.save(token, dbUser, 2*60*60);
    //定义返回结果
    TokenVO result = new TokenVO();
    //封装token
    result.setToken(token);
    //封装应用系统访问地址
    result.setRedirectURL(loginDTO.getRedirectURL());
    return result;
}

客户端收到登录成功之后,根据参数组合进行跳转到对应的应用系统。

跳转示例如下:
http://xxx.com/page.html?token=xxxxxx

各个应用系统,只需要编写一个过滤器
TokenFilter
对token参数进行验证拦截,即可实现对接,代码如下:

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws ServletException, IOException, SecurityException {
    HttpServletRequest request = (HttpServletRequest) servletRequest;
    HttpServletResponse response = (HttpServletResponse) servletResponse;

    String requestUri = request.getRequestURI();
    String contextPath = request.getContextPath();
    String serviceName = request.getServerName();

    //添加到白名单的URL放行
    String[] excludeUrls = {
            "(?:/images/|/css/|/js/|/template/|/static/|/web/|/constant/).+$",
            "/user/login",
            "/user/createImage"
    };
    for (String url : excludeUrls) {
        if (requestUri.matches(contextPath + url) || (serviceName.matches(url))) {
            filterChain.doFilter(request, response);
            return;
        }
    }
    //运行跨域探测
    if(RequestMethod.OPTIONS.name().equals(request.getMethod())){
        filterChain.doFilter(request, response);
        return;
    }

    //检查token是否有效
    final String token = request.getHeader("token");
    if(StringUtils.isEmpty(token) || !redisUtil.exist(token)){
        ResultMsg<Object> resultMsg = new ResultMsg<>(4000, "token已失效");
        //封装跳转地址
        resultMsg.setRedirectURL("http://sso.xxx.com?redirectURL=" + request.getRequestURL());
        WebUtil.buildPrintWriter(response, resultMsg);
        return;
    }
    //将用户信息,存入request中,方便后续获取
    User user =  redisUtil.get(token);
    request.setAttribute("user", user);
    filterChain.doFilter(request, response);
    return;
}

上面返回的是
json
数据给前端,当然你还可以直接在服务器采用重定向进行跳转,具体根据自己的情况进行选择。

由于每个应用系统都可能需要进行对接,因此我们可以将上面的方法封装成一个
jar
包,应用系统只需要依赖包即可完成对接!

3.2、token存放客户端

还有一种方案,是将token存放客户端,这种方案就是服务端根据规则对数据进行加密生成一个签名串,这个签名串就是我们所说的token,最后返回给前端。

因为加密的操作都是在服务端完成的,因此密钥的管理非常重要,不能泄露出去,不然很容易被黑客解密出来。

最典型的应用就是JWT!

JWT 是由三段信息构成的,将这三段信息文本用
.
链接一起就构成了
JWT
字符串。就像这样:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.TJVA95OrM7E2cBab30RMHrHDcEfxjoYZgeFONFh7HgQ

如何实现呢?首先我们需要添加一个
jwt
依赖包。

<!-- jwt支持 -->
<dependency>
    <groupId>com.auth0</groupId>
    <artifactId>java-jwt</artifactId>
    <version>3.4.0</version>
</dependency>

然后,创建一个用户信息类,将会通过加密存放在
token

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class UserToken implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 用户ID
     */
    private String userId;

    /**
     * 用户登录账户
     */
    private String userNo;

    /**
     * 用户中文名
     */
    private String userName;
}

接着,创建一个
JwtTokenUtil
工具类,用于创建
token
、验证
token

public class JwtTokenUtil {

    //定义token返回头部
    public static final String AUTH_HEADER_KEY = "Authorization";

    //token前缀
    public static final String TOKEN_PREFIX = "Bearer ";

    //签名密钥
    public static final String KEY = "q3t6w9z$C&F)J@NcQfTjWnZr4u7x";
    
    //有效期默认为 2hour
    public static final Long EXPIRATION_TIME = 1000L*60*60*2;


    /**
     * 创建TOKEN
     * @param content
     * @return
     */
    public static String createToken(String content){
        return TOKEN_PREFIX + JWT.create()
                .withSubject(content)
                .withExpiresAt(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .sign(Algorithm.HMAC512(KEY));
    }

    /**
     * 验证token
     * @param token
     */
    public static String verifyToken(String token) throws Exception {
        try {
            return JWT.require(Algorithm.HMAC512(KEY))
                    .build()
                    .verify(token.replace(TOKEN_PREFIX, ""))
                    .getSubject();
        } catch (TokenExpiredException e){
            throw new Exception("token已失效,请重新登录",e);
        } catch (JWTVerificationException e) {
            throw new Exception("token验证失败!",e);
        }
    }
}

同时编写配置类,允许跨域,并且创建一个权限拦截器

@Slf4j
@Configuration
public class GlobalWebMvcConfig implements WebMvcConfigurer {
       /**
     * 重写父类提供的跨域请求处理的接口
     * @param registry
     */
    @Override
    public void addCorsMappings(CorsRegistry registry) {
        // 添加映射路径
        registry.addMapping("/**")
                // 放行哪些原始域
                .allowedOrigins("*")
                // 是否发送Cookie信息
                .allowCredentials(true)
                // 放行哪些原始域(请求方式)
                .allowedMethods("GET", "POST", "DELETE", "PUT", "OPTIONS", "HEAD")
                // 放行哪些原始域(头部信息)
                .allowedHeaders("*")
                // 暴露哪些头部信息(因为跨域访问默认不能获取全部头部信息)
                .exposedHeaders("Server","Content-Length", "Authorization", "Access-Token", "Access-Control-Allow-Origin","Access-Control-Allow-Credentials");
    }

    /**
     * 添加拦截器
     * @param registry
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        //添加权限拦截器
        registry.addInterceptor(new AuthenticationInterceptor()).addPathPatterns("/**").excludePathPatterns("/static/**");
    }
}

使用
AuthenticationInterceptor
拦截器对接口参数进行验证

@Slf4j
public class AuthenticationInterceptor implements HandlerInterceptor {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        // 从http请求头中取出token
        final String token = request.getHeader(JwtTokenUtil.AUTH_HEADER_KEY);
        //如果不是映射到方法,直接通过
        if(!(handler instanceof HandlerMethod)){
            return true;
        }
        //如果是方法探测,直接通过
        if (HttpMethod.OPTIONS.equals(request.getMethod())) {
            response.setStatus(HttpServletResponse.SC_OK);
            return true;
        }
        //如果方法有JwtIgnore注解,直接通过
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method=handlerMethod.getMethod();
        if (method.isAnnotationPresent(JwtIgnore.class)) {
            JwtIgnore jwtIgnore = method.getAnnotation(JwtIgnore.class);
            if(jwtIgnore.value()){
                return true;
            }
        }
        LocalAssert.isStringEmpty(token, "token为空,鉴权失败!");
        //验证,并获取token内部信息
        String userToken = JwtTokenUtil.verifyToken(token);
        
        //将token放入本地缓存
        WebContextUtil.setUserToken(userToken);
        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        //方法结束后,移除缓存的token
        WebContextUtil.removeUserToken();
    }
}

最后,在
controller
层用户登录之后,创建一个
token
,存放在头部即可

/**
 * 登录
 * @param userDto
 * @return
 */
@JwtIgnore
@RequestMapping(value = "/login", method = RequestMethod.POST, produces = {"application/json;charset=UTF-8"})
public UserVo login(@RequestBody UserDto userDto, HttpServletResponse response){
    //...参数合法性验证

    //从数据库获取用户信息
    User dbUser = userService.selectByUserNo(userDto.getUserNo);

    //....用户、密码验证

    //创建token,并将token放在响应头
    UserToken userToken = new UserToken();
    BeanUtils.copyProperties(dbUser,userToken);

    String token = JwtTokenUtil.createToken(JSONObject.toJSONString(userToken));
    response.setHeader(JwtTokenUtil.AUTH_HEADER_KEY, token);


    //定义返回结果
    UserVo result = new UserVo();
    BeanUtils.copyProperties(dbUser,result);
    return result;
}

到这里基本就完成了!

其中
AuthenticationInterceptor
中用到的
JwtIgnore
是一个注解,用于不需要验证
token
的方法上,例如验证码的获取等等。

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface JwtIgnore {

    boolean value() default true;
}


WebContextUtil
是一个线程缓存工具类,其他接口通过这个方法即可从
token
中获取用户信息。

public class WebContextUtil {

    //本地线程缓存token
    private static ThreadLocal<String> local = new ThreadLocal<>();

    /**
     * 设置token信息
     * @param content
     */
    public static void setUserToken(String content){
        removeUserToken();
        local.set(content);
    }

    /**
     * 获取token信息
     * @return
     */
    public static UserToken getUserToken(){
        if(local.get() != null){
            UserToken userToken = JSONObject.parseObject(local.get() , UserToken.class);
            return userToken;
        }
        return null;
    }

    /**
     * 移除token信息
     * @return
     */
    public static void removeUserToken(){
        if(local.get() != null){
            local.remove();
        }
    }
}

对应用系统而言,重点在于
token
的验证,可以将拦截器方法封装成一个公共的
jar
包,然后各个应用系统引用即可!

和上面介绍的
token
存储到
redis
方案类似,不同点在于:一个将用户数据存储到
redis
,另一个是采用加密算法存储到客户端进行传输。

四、小结

在实际的使用过程中,我个人更加倾向于采用
jwt
方案,直接在服务端使用签名加密算法生成一个
token
,然后在客户端进行流转,天然支持分布式,但是要注意加密时用的密钥要安全管理。

而采用
redis
方案存储的时候,你需要搭建高可用的集群环境,同时保证缓存数据不会失效等等,维护成本高!

在实际的实现上,每个公司玩法不一样,有的安全性要求高,后端还会加上密钥环节进行安全验证,基本思路大同小异。

项目源代码地址:
spring-boot-example-jwt

一:背景

1. 讲故事

前段时间有位朋友找到我,说他们有一个崩溃的dump让我帮忙看下怎么回事,确实有太多的人在网上找各种故障分析最后联系到了我,还好我一直都是免费分析,不收取任何费用,造福社区。

话不多说,既然有 dump 来了,那就上 windbg 说话吧。

二:WinDbg 分析

1. 为什么会崩溃

说实话windbg非常强大,双击打开dump就能第一时间帮你显示出简略的异常信息,输出如下:


This dump file has an exception of interest stored in it.
The stored exception information can be accessed via .ecxr.
(bf8.5dc4): Access violation - code c0000005 (first/second chance not available)
For analysis of this file, run !analyze -v
clr!WKS::gc_heap::mark_object_simple1+0x220:
00007ffb`380453c4 833a00          cmp     dword ptr [rdx],0 ds:00007ffa`35451300=????????

从卦中又看到了经典的
mark_object_simple1
方法,这个方法是GC用来做对象标记之用的,所以大概率又是托管堆损坏,真是无语了,接下来用
!verifyheap
检查下托管堆。


0:083> !verifyheap
object 00000218e96963d8: bad member 00000218E9696450 at 00000218E9696420
Last good object: 00000218E96963C0.
Could not request method table data for object 00000218E9696450 (MethodTable: 00007FFA35451300).
Last good object: 00000218E96963D8.

一看这卦就很不吉利,真的是有对象的mt是不对的,至此我们把崩溃的直接原因给找到了。

2. 为什么对象损坏了

要找到这个答案就需要深挖
00000218e96963d8
对象,分别使用
!do
命令以及
dp
来观察内存地址。


0:083> !do 00000218e96963d8
Name:        System.Threading.Tasks.Task+DelayPromise
MethodTable: 00007ffb3542b3e8
EEClass:     00007ffb3567c7c0
Size:        120(0x78) bytes
File:        C:\Windows\Microsoft.Net\assembly\GAC_64\mscorlib\v4.0_4.0.0.0__b77a5c561934e089\mscorlib.dll
Fields:
...
00007ffb35451300  40035d5       48 ...m.Threading.Timer  0 instance 00000218e9696450 Timer

0:083> dp 00000218e9696450 L6
00000218`e9696450  00007ffa`35451301 00000000`00000000
00000218`e9696460  00000218`e96964c8 00000000`00000000
00000218`e9696470  00007ffb`353e4b51 00000218`e9696368

仔细观察卦中对象
00000218e9696450
所显示的mt,你会发现一个是
00007ffb35451300
,一个是
00007ffa35451301
,很显然前者是对的,后者是错的,可以分别用
!dumpmt
做个验证。


0:083> !dumpmt 00007ffb35451300
EEClass:         00007ffb356942f0
Module:          00007ffb353b1000
Name:            System.Threading.Timer
mdToken:         0000000002000504
File:            C:\Windows\Microsoft.Net\assembly\GAC_64\mscorlib\v4.0_4.0.0.0__b77a5c561934e089\mscorlib.dll
BaseSize:        0x20
ComponentSize:   0x0
Slots in VTable: 23
Number of IFaces in IFaceMap: 1

0:083> !dumpmt 00007ffa35451301
00007ffa35451301 is not a MethodTable

细心的朋友会发现虽然两个mt地址不一样,但已经非常相近,看样子又是一例经典的bit位翻转,我去,用
.formats
转成二进制观察一下,截图如下:

从卦中可以清晰的看到当前地址有两个 bit 的翻转,分别是
第0位
和第
32位
,接下来就要洞察为什么会有两个bit位的翻转?

3. 真的存在两个bit位翻转吗

接下来我们逐一来聊一下。

  1. bit 0 为什么会翻转

熟悉 coreclr 底层的朋友应该知道,gc 在标记的过程中会给 mt 的第0位设置为1,表示当前对象在深度优先中已经标记过,防止重复标记,当然这个也是有源码作证的,简化后的代码如下:


inline BOOL gc_heap::gc_mark(uint8_t* o, uint8_t* low, uint8_t* high, int condemned_gen)
{
	if ((o >= low) && (o < high))
	{
		BOOL already_marked = marked(o);
		if (already_marked)
		{
			return FALSE;
		}
		set_marked(o);
		
		return TRUE;
	}
}

#define marked(i) header(i)->IsMarked()

BOOL IsMarked() const
{
	return !!(((size_t)RawGetMethodTable()) & GC_MARKED);
}

有了这段源码,这个 bit 为什么为 1 就能轻松的解释了,所以这个翻转是一个正常情况。

  1. bit 32 为什么会翻转

这个是我无法解释的,也正是因为这个 bit32 的翻转导致 gc 认为这个 obj 是一个损坏的对象,到底是什么原因呢?民间众说纷纭,在我的过往分析旅程中我已见过两例,但我不敢确定自己又遇到了辐射类的奇葩情况,所以也第一时间找朋友确认程序周边是否存在辐射环境。

朋友反馈过来附近有
伺服电机
类,说实话工控的东西我是真的不太懂,只能上网搜搜这玩意是否有辐射,截图如下:

到底是不是这玩意导致的,其实我心里也没底,跟朋友的沟通后说是只出现过一次,这就更加玄乎了。

不管怎么说,我只能给出如下两个方案:

  • 上 ECC 纠错内存
  • 远离辐射环境

三:总结

在大工控领域里,这是我见过第三例bit位翻转导致的程序崩溃,太无语了,恶魔到底是不是旁边的
伺服电机
? 希望领域内的同行们留言讨论下,让我长长见识,感谢!

图片名称

Java Redis多限流

在Java中实现Redis多限流通常涉及使用Redis的某些特性,如
INCR

EXPIRE

Lua
脚本或者更高级的Redis数据结构如
Redis Bitmaps

Redis Streams
结合
Redis Pub/Sub
,或者使用Redis的第三方库如
Redis Rate Limiter
(基于Lua脚本或Redis自身功能实现)。然而,为了直接和易于实现,这里我们将使用
Jedis
库(Java的Redis客户端)结合Redis的
INCR

EXPIRE
命令来模拟一个基本的分布式多限流系统。

1. 使用
Jedis
库结合Redis的
INCR

EXPIRE
命令模拟一个基本的分布式多限流系统

1.1 准备工作

(1)
Redis安装
:确保Redis服务在我们的开发环境中已经安装并运行。

(2)
Jedis依赖
:在我们的Java项目中添加Jedis依赖。如果我们使用Maven,可以在
pom.xml
中添加以下依赖:

<dependency>  
    <groupId>redis.clients</groupId>  
    <artifactId>jedis</artifactId>  
    <version>最新版本</version>  
</dependency>

请替换
最新版本
为当前Jedis的最新版本。

1.2 实现代码

下面是一个简单的Java程序,使用Jedis和Redis的
INCR

EXPIRE
命令来实现基本的限流功能。这里我们假设每个用户(或API端点)都有自己的限流键。

import redis.clients.jedis.Jedis;  
  
public class RedisRateLimiter {  
  
    private static final String REDIS_HOST = "localhost";  
    private static final int REDIS_PORT = 6379;  
    private static final long LIMIT = 10; // 每分钟最多请求次数  
    private static final long TIME_INTERVAL = 60; // 时间间隔,单位为秒  
  
    public static void main(String[] args) {  
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {  
            String userId = "user123"; // 假设这是用户ID或API端点标识符  
            String key = "rate_limit:" + userId;  
  
            // 尝试获取访问权限  
            if (tryAcquire(jedis, key, LIMIT, TIME_INTERVAL)) {  
                System.out.println("请求成功,未超过限流限制");  
                // 在这里处理你的请求  
  
            } else {  
                System.out.println("请求失败,超过限流限制");  
                // 处理限流情况,如返回错误码或等待一段时间后重试  
            }  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    /**  
     * 尝试获取访问权限  
     *  
     * @param jedis Redis客户端  
     * @param key   限流键  
     * @param limit 限制次数  
     * @param timeInterval 时间间隔(秒)  
     * @return 是否获取成功  
     */  
    public static boolean tryAcquire(Jedis jedis, String key, long limit, long timeInterval) {  
        String result = jedis.watch(key);  
        if (result != null && result.equalsIgnoreCase("OK")) {  
            String counter = jedis.get(key);  
            if (counter == null || Long.parseLong(counter) < limit) {  
                // 使用事务,先incr后expire,确保原子性  
                Transaction transaction = jedis.multi();  
                transaction.incr(key);  
                transaction.expire(key, timeInterval);  
                List<Object> results = transaction.exec();  
                if (results != null && results.size() == 2 && "OK".equals(results.get(0).toString()) && "1".equals(results.get(1).toString())) {  
                    return true;  
                }  
            }  
            // 取消watch  
            jedis.unwatch();  
        }  
        // 如果key不存在或超过限制,则直接返回false  
        return false;  
    }  
}

注意
:上述代码中的
tryAcquire
方法使用了Redis的
WATCH

MULTI
/
EXEC
命令来尝试实现操作的原子性,但这种方法在Redis集群环境中可能不是最佳实践,因为
WATCH
/
UNWATCH
是基于单个Redis实例的。在分布式环境中,我们可能需要考虑使用Redis的Lua脚本来确保操作的原子性,或者使用专门的限流库。

此外,上述代码在并发极高的情况下可能不是最优的,因为它依赖于Redis的
WATCH
机制来避免竞态条件,这在性能上可能不是最高效的。对于高并发的限流需求,我们可能需要考虑使用更专业的限流算法或库,如令牌桶(Token Bucket)或漏桶(Leaky Bucket)。

2. 基于Jedis和Lua脚本的限流示例

在Java中使用Redis进行多限流时,我们通常会选择更健壮和高效的方案,比如使用Redis的Lua脚本来保证操作的原子性,或者使用现成的Redis限流库。不过,为了保持示例的简洁性和易于理解,我将提供一个基于Jedis和Lua脚本的限流示例。

在这个示例中,我们将使用Redis的Lua脚本来实现一个简单的令牌桶限流算法。Lua脚本可以在Redis服务器上以原子方式执行多个命令,这对于限流等需要原子操作的场景非常有用。

2.1 Java Redis多限流(Lua脚本示例)

首先,我们需要有一个Redis服务器运行在我们的环境中,并且我们的Java项目中已经添加了Jedis依赖。

2.1.1 Lua脚本

以下是一个简单的Lua脚本,用于实现令牌桶的限流逻辑。这个脚本会检查当前桶中的令牌数,如果足够则减少令牌数并返回成功,否则返回失败。

-- Lua脚本:token_bucket_limit.lua  
-- KEYS[1] 是令牌桶的key  
-- ARGV[1] 是请求的令牌数  
-- ARGV[2] 是桶的容量  
-- ARGV[3] 是每秒添加的令牌数  
-- ARGV[4] 是时间间隔(秒),用于计算当前时间应该有多少令牌  
  
local key = KEYS[1]  
local request = tonumber(ARGV[1])  
local capacity = tonumber(ARGV[2])  
local rate = tonumber(ARGV[3])  
local interval = tonumber(ARGV[4])  
  
-- 获取当前时间戳  
local current_time = tonumber(redis.call("TIME")[1])  
  
-- 尝试获取桶的上次更新时间和当前令牌数  
local last_updated_time = redis.call("GET", key .. "_last_updated_time")  
local current_tokens = redis.call("GET", key .. "_tokens")  
  
if last_updated_time == false then  
    -- 如果桶不存在,则初始化桶  
    redis.call("SET", key .. "_last_updated_time", current_time)  
    redis.call("SET", key .. "_tokens", capacity)  
    current_tokens = capacity  
    last_updated_time = current_time  
end  
  
-- 计算自上次更新以来经过的时间  
local delta = current_time - last_updated_time  
  
-- 计算这段时间内应该添加的令牌数  
local tokens_to_add = math.floor(delta * rate)  
  
-- 确保令牌数不会超过容量  
if current_tokens + tokens_to_add > capacity then  
    tokens_to_add = capacity - current_tokens  
end  
  
-- 更新令牌数和更新时间  
current_tokens = current_tokens + tokens_to_add  
redis.call("SET", key .. "_tokens", current_tokens)  
redis.call("SET", key .. "_last_updated_time", current_time)  
  
-- 检查是否有足够的令牌  
if current_tokens >= request then  
    -- 如果有足够的令牌,则减少令牌数  
    redis.call("DECRBY", key .. "_tokens", request)  
    return 1  -- 返回成功  
else  
    return 0  -- 返回失败  
end

2.1.2 Java代码

接下来是Java中使用Jedis调用上述Lua脚本的代码。

import redis.clients.jedis.Jedis;  
  
public class RedisRateLimiter {  
  
    private static final String REDIS_HOST = "localhost";  
    private static final int REDIS_PORT = 6379;  
    private static final String LUA_SCRIPT = "path/to/your/token_bucket_limit.lua"; // Lua脚本的路径(或者你可以直接加载脚本内容)  
  
    public static void main(String[] args) {  
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {  
            String key = "rate_limit_bucket:user123";  
            int requestTokens = 1;  
            int capacity = 10;  
            double rate = 1.0; // 每秒添加1个令牌  
            int interval = 60; // 时间间隔为60秒  
  
            // 加载Lua脚本(这里假设你已经有了Lua脚本的内容或路径)  
            // 实际应用中,你可能需要从文件加载Lua脚本内容  
            String scriptContent = // ... 从文件或其他地方加载Lua脚本内容  
  
            // 注册Lua脚本到Redis  
            String sha1 = jedis.scriptLoad(scriptContent);  
  
            // 执行Lua脚本  
            Object result = jedis.evalsha(sha1, 1, key, String.valueOf(requestTokens), String.valueOf(capacity), String.

在之前的代码中,我们留下了加载Lua脚本和执行它的部分未完成。以下是完整的Java代码示例,包括如何加载Lua脚本并执行它以进行限流检查。

2.1.3 完整的Java代码示例

import redis.clients.jedis.Jedis;  
  
import java.io.BufferedReader;  
import java.io.FileReader;  
import java.io.IOException;  
  
public class RedisRateLimiter {  
  
    private static final String REDIS_HOST = "localhost";  
    private static final int REDIS_PORT = 6379;  
    private static final String LUA_SCRIPT_PATH = "path/to/your/token_bucket_limit.lua"; // Lua脚本的文件路径  
  
    public static void main(String[] args) {  
        try (Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT)) {  
            String key = "rate_limit_bucket:user123";  
            int requestTokens = 1;  
            int capacity = 10;  
            double rate = 1.0; // 每秒添加1个令牌  
            int interval = 1; // 时间间隔为1秒(这里仅为示例,实际中可能更长)  
  
            // 加载Lua脚本  
            String luaScript = loadLuaScript(LUA_SCRIPT_PATH);  
  
            // 注册Lua脚本到Redis(获取SHA1哈希值)  
            String sha1 = jedis.scriptLoad(luaScript);  
  
            // 执行Lua脚本进行限流检查  
            // KEYS[1] 是 key, ARGV 是其他参数  
            Long result = (Long) jedis.evalsha(sha1, 1, key, String.valueOf(requestTokens), String.valueOf(capacity), String.valueOf(rate), String.valueOf(interval));  
  
            if (result == 1L) {  
                System.out.println("请求成功,有足够的令牌。");  
                // 处理请求...  
            } else {  
                System.out.println("请求失败,令牌不足。");  
                // 拒绝请求或进行其他处理...  
            }  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    // 从文件加载Lua脚本内容  
    private static String loadLuaScript(String filePath) throws IOException {  
        StringBuilder sb = new StringBuilder();  
        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {  
            String line;  
            while ((line = reader.readLine()) != null) {  
                sb.append(line).append("\n");  
            }  
        }  
        return sb.toString();  
    }  
}

2.1.4 注意事项

(1)
Lua脚本路径
:确保
LUA_SCRIPT_PATH
变量指向正确的Lua脚本文件路径。

(2)
错误处理
:在实际应用中,我们可能需要添加更详细的错误处理逻辑,比如处理Redis连接失败、Lua脚本加载失败等情况。

(3)
性能考虑
:虽然Lua脚本在Redis中执行是高效的,但在高并发场景下,频繁的脚本执行仍然可能对Redis服务器造成压力。我们可能需要考虑使用Redis的内置限流功能(如Redis 6.0及以上版本的
Redis Streams

Redis Bloom Filters
),或者通过增加Redis实例、使用集群等方式来扩展我们的系统。

(4)
Lua脚本的复杂性
:随着业务逻辑的复杂化,Lua脚本可能会变得难以维护。在这种情况下,我们可能需要考虑将部分逻辑移到Java代码中,或者通过其他方式(如使用Redis的模块)来扩展Redis的功能。

(5)
时间同步
:Lua脚本中的时间计算依赖于Redis服务器的时间。确保Redis服务器的时间与我们的应用服务器时间保持同步,以避免因时间差异导致的问题。

3. Redis多限流

Redis作为一种高性能的键值对存储系统,支持多种数据结构和操作,非常适合用于实现限流算法。以下是关于Redis多限流的一些详细信息:

3.1 Redis限流算法概述

Redis实现限流主要依赖于其原子操作、高速缓存和丰富的数据结构(如字符串、列表、集合、有序集合等)。常见的限流算法包括令牌桶算法(Token Bucket)、漏桶算法(Leaky Bucket)以及基于计数器的简单限流算法。

(1)令牌桶算法:

  • 初始化一个固定容量的令牌桶,以固定速率向桶中添加令牌。
  • 每个请求尝试从桶中获取一个令牌,如果成功则处理请求,否则拒绝或等待。
  • 令牌桶的容量和添加速率决定了系统的最大处理能力和平均处理速率。

(2)漏桶算法:

  • 请求被放入一个桶中,桶以恒定速率漏出请求。
  • 如果桶满,则新到的请求被拒绝或等待。
  • 漏桶算法对突发流量有很好的抑制作用,但可能无法高效利用资源。

(3)计数器算法:

  • 在每个时间窗口内记录请求次数,达到阈值时拒绝新请求。
  • 时间窗口结束后计数器重置。
  • 实现简单但可能存在临界问题,限流不准确。

3.2 Redis多限流实现方式

在分布式系统中,Redis可以实现全局的限流,支持多种限流策略的组合使用。

(1)使用Redis数据结构:

  • 字符串
    :记录当前时间窗口内的请求次数或令牌数。
  • 列表
    :记录请求的时间戳,用于滑动窗口算法。
  • 有序集合
    (ZSet):记录请求的时间戳和唯一标识,用于精确控制时间窗口内的请求数。
  • 哈希表
    :存储令牌桶的状态,包括当前令牌数和上次更新时间。

(2)Lua脚本:

  • 利用Redis的Lua脚本功能,可以编写复杂的限流逻辑,并通过原子操作执行,确保并发安全性。
  • Lua脚本可以在Redis服务器端执行,减少网络传输和延迟。

(3)分布式锁:

  • 在高并发场景下,为了防止多个实例同时修改同一个限流键,可以使用Redis的分布式锁机制。
  • 但需要注意分布式锁的性能和可用性问题。

3.3 Redis多限流实际应用

在实际应用中,Redis多限流可以用于多种场景,如API接口限流、用户行为限流、系统资源访问限流等。通过组合不同的限流算法和数据结构,可以实现复杂的限流策略,满足不同业务需求。

例如,一个电商平台可能需要对用户登录、商品浏览、下单等行为进行限流。对于登录行为,可以使用令牌桶算法限制用户登录频率;对于商品浏览行为,可以使用漏桶算法控制突发流量;对于下单行为,则可能需要结合用户身份、订单金额等多个因素进行综合限流。

3.4 注意事项

(1)
性能问题
:在高并发场景下,Redis的性能可能会成为瓶颈。需要合理设计限流策略和Redis的部署架构,确保系统稳定运行。

(2)
持久化问题
:Redis是内存数据库,数据丢失风险较高。在需要持久化限流数据的场景下,需要考虑Redis的持久化机制。

(3)
分布式问题
:在分布式系统中,需要确保Redis集群的稳定性和可用性,以及限流数据的一致性和准确性。

综上所述,Redis多限流是一种强大而灵活的技术手段,通过合理的策略设计和实现方式,可以有效地保护系统资源和服务质量。

boltdb

网上关于boltdb的文章有很多,特别是微信公众号上,例如:
boltdb源码分析系列-事务-腾讯云开发者社区-腾讯云 (tencent.com)
这些文章都写的挺好,但不一定覆盖了我所关注的几个点,下面我把我关注的几个点就来下来。

node page bucket tx db的关系

  • 磁盘数据mmap到page内存区域,也可以理解为就是磁盘数据
    • page需要一段连续的内存
  • node封装的B+树节点数据结构
  • bucket一个B+树数据结构。可以理解成一个表
  • tx 读事务或读写事务
    • bucket是内存结构每个tx中都会生成一个
    • 会将tx中涉及到(读取过、修改过)的nodes都记录在bucket中
    • 读写事务最终写入磁盘时是需要重新申请新的page的,即不会修改原有的page
  • db整个数据库文件
    • db中的freelist记录了db文件中空闲的页(即已经可以释放掉的页)

tx.commit

在boltdb的 commit中才会执行b+树的rebalance操作,执行完后再进行写入磁盘的操作。也就是说在一个事务中涉及到的多次写操作,会最终在commit的时候同意执行写入磁盘spill操作。

func (tx *Tx) Commit() error {
    _assert(!tx.managed, "managed tx commit not allowed")
    if tx.db == nil {
        return ErrTxClosed
    } else if !tx.writable {
        return ErrTxNotWritable
    }
       
    // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

    // Rebalance nodes which have had deletions.
    var startTime = time.Now()
    tx.root.rebalance()
    if tx.stats.Rebalance > 0 {
        tx.stats.RebalanceTime += time.Since(startTime)
    }
    
    // spill data onto dirty pages.
    startTime = time.Now()
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }

也正因为txn中可能有多个key插入,所以split就可能会进行多次

func (n *node) split(pageSize int) []*node {
    var nodes []*node

    node := n
    for {
        // Split node into two.
        a, b := node.splitTwo(pageSize)
        nodes = append(nodes, a)

        // If we can't split then exit the loop.
        if b == nil {
            break
        }   

        // Set node to b so it gets split on the next iteration.
        node = b 
    }   

    return nodes
}

node.go

数据写入到磁盘的时候,是从下层节点往上层节点写的

// spill writes the nodes to dirty pages and splits nodes as it goes.
// Returns an error if dirty pages cannot be allocated.
func (n *node) spill() error {
    var tx = n.bucket.tx
    if n.spilled {
        return nil
    }
    
    // Spill child nodes first. Child nodes can materialize sibling nodes in
    // the case of split-merge so we cannot use a range loop. We have to check
    // the children size on every loop iteration.
    sort.Sort(n.children)
    for i := 0; i < len(n.children); i++ {
        if err := n.children[i].spill(); err != nil {
            return err
        } 
    }
    
    // We no longer need the child list because it's only used for spill tracking.
    n.children = nil

    // Split nodes into appropriate sizes. The first node will always be n.
    var nodes = n.split(tx.db.pageSize)

node.go

数据较大如何处理?直接将构造一个大的page将数据存储进去。与此同时,原先node关联的page可以释放掉了。因为整个是一个append only模式,原先的page在新事务生成,且没有其他读事务访问后就可以释放掉了。

    for _, node := range nodes {
        // Add node's page to the freelist if it's not new.
        if node.pgid > 0 {
            tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
            node.pgid = 0
        }
    
        // Allocate contiguous space for the node.
        p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
        if err != nil {
            return err
        }


node.go

哪些node需要rebalance呢,size < 25% page_size或者中间节点小于2个key,叶子节点小于1个key。

func (n *node) rebalance() {
    if !n.unbalanced {
        return
    }
    n.unbalanced = false
        
    // Update statistics.
    n.bucket.tx.stats.Rebalance++

    // Ignore if node is above threshold (25%) and has enough keys.
    var threshold = n.bucket.tx.db.pageSize / 4
    if n.size() > threshold && len(n.inodes) > n.minKeys() {
        return
    } 

node.go

bucket中读到了node,就将node加入到bucket中,读到了就意味着这些node可能就会发生改变。它是在cursor移动的时候加入到bucket中的。

func (c *Cursor) node() *node {
    _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")

    // If the top of the stack is a leaf node then just return it.
    if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
        return ref.node
    }
    
    // Start from root and traverse down the hierarchy.
    var n = c.stack[0].node
    if n == nil {
        n = c.bucket.node(c.stack[0].page.id, nil)
    }
    for _, ref := range c.stack[:len(c.stack)-1] {
        _assert(!n.isLeaf, "expected branch node")
        n = n.childAt(int(ref.index))
    }   
    _assert(n.isLeaf, "expected leaf node")
    return n
}  
// node creates a node from a page and associates it with a given parent.
func (b *Bucket) node(pgid pgid, parent *node) *node {
    _assert(b.nodes != nil, "nodes map expected")

    // Retrieve node if it's already been created.
    if n := b.nodes[pgid]; n != nil {
        return n
    }   

    // Otherwise create a node and cache it.
    n := &node{bucket: b, parent: parent}
    if parent == nil {
        b.rootNode = n 
    } else {
        parent.children = append(parent.children, n)
    }   

    // Use the inline page if this is an inline bucket.
    var p = b.page
    if p == nil {
        p = b.tx.page(pgid)
    }   

    // Read the page into the node and cache it.
    n.read(p)
    b.nodes[pgid] = n 

    // Update statistics.
    b.tx.stats.NodeCount++

freelist

它表示的是磁盘中已经释放的页

结构

  • ids 所有空闲页
  • pending {txid, pageids[]}即将释放的txid以及其关联的pageid
  • cache map索引

->pending 释放实际

  • tx.commit时会将事务中涉及到的老的node对应的page都放到pending中
    • node.spill中将关联的旧node(node与page对应)放到freelist的pending中

pending->release释放时机

tx的commit阶段会将事务涉及的原先老page放到freelist的pending中。

func (f *freelist) free(txid txid, p *page) {
    if p.id <= 1 {
        panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
    }       
        
    // Free page and all its overflow pages.
    var ids = f.pending[txid]
    for id := p.id; id <= p.id+pgid(p.overflow); id++ {
        // Verify that page is not already free.
        if f.cache[id] {
            panic(fmt.Sprintf("page %d already freed", id))
        }
        
        // Add to the freelist and cache.
        ids = append(ids, id)
        f.cache[id] = true
    }
    f.pending[txid] = ids
}   

db.beginRWTx 开启读写事务的时候会尝试将过期的page释放掉

func (f *freelist) release(txid txid) {
    m := make(pgids, 0)
    for tid, ids := range f.pending {
        if tid <= txid {
            // Move transaction's pending pages to the available freelist.
            // Don't remove from the cache since the page is still free.
            m = append(m, ids...)
            delete(f.pending, tid)
        }
    }
    sort.Sort(m)
    f.ids = pgids(f.ids).merge(m)
}

来自Sergey Tepliakov的
https://sergeyteplyakov.github.io/Blog/csharp/2024/06/14/Custom_Task_Scheduler.html

如果你不知道什么是
TaskScheduler
或你的项目中没有它的自定义实现,你可能可以跳过这篇文章。但如果你不知道它是什么,但你的项目中确实有一两个,那么这篇文章绝对适合你。

让我们从基础开始。任务并行库(也称为TPL)引入于2010年的NET 4.0。当时它主要用于并行编程,而不是异步编程,因为异步编程在C#4和NET 4.0中不是一等公民。

例如,体现在TPL API中,
Task.Factory.StartNew
的入参为委托,返回
void

T
,而不是
Task

Task<T>
:

var task = Task.Factory.StartNew(() =>
	 {
		Console.WriteLine("Starting work...");
		Thread.Sleep(1000);
		Console.WriteLine("Done doing work.");
	});

Task.Factory.StartNew
有相当多的重载,其中一个需要
TaskScheduler
.这是一种定义如何在运行时执行任务的策略。

默认情况下(如果未传递自定义
TaskScheduler
项,同时
TaskCreationOptions.LongRunning
未传递自定义项),则使用默认
TaskScheduler
。这是一个称为
ThreadPoolTaskScheduler
的内部类型,它使用 .NET 线程池来管理任务。(如果 传递
TaskCreationOptions.LongRunning
参数 给
Task.Factory.Startnew
,则使用专用线程来避免长时间使用线程池中的线程)。
与任何新技术一样,当 TPL 发布时,书呆子们很兴奋,并试图尽可能多地使用(和滥用)新技术。如果Microsoft给你一个可扩展的库,有些人认为这是一个好主意......你知道的。。。扩展它。
最常见的模式之一是并发限制,它使用固定数量的专用线程来确保您不会超额订阅 CPU:

public sealed class DedicatedThreadsTaskScheduler : TaskScheduler
{
    private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
    private readonly List<Thread> _threads;

    public DedicatedThreadsTaskScheduler(int threadCount)
    {
        _threads = Enumerable.Range(0, threadCount).Select(i =>
        {
            var t = new Thread(() =>
            {
                foreach (var task in _tasks.GetConsumingEnumerable())
                {
                    TryExecuteTask(task);
                }
            })
            {
                IsBackground = true,
            };

            t.Start();
            return t;

        }).ToList();
    }

    protected override void QueueTask(Task task) => _tasks.Add(task);

    public override int MaximumConcurrencyLevel => _threads.Count;

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false;

    protected override IEnumerable<Task> GetScheduledTasks() => _tasks;
}

此外还有很多其他实现执行相同的操作:
DedicatedThreadTaskScheduler
、、
DedicatedThreadsTaskScheduler

LimitedConcurrencyLevelTaskScheduler
甚至
IOCompletionPortTaskScheduler
使用 IO 完成端口来限制并发性。

无论实现和幻想如何,它们都做同样的事情:它们最多允许同时执行给定数量的任务。下面是一个示例,说明我们如何使用它来强制最多同时运行 2 个任务:

var sw = Stopwatch.StartNew();
// Passing 2 as the threadCount to make sure we have at most 2 pending tasks.
var scheduler = new DedicatedThreadsTaskScheduler(threadCount: 2);
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
    int num = i;
    var task = Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
        Thread.Sleep((num + 1) * 1000);
        Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Finishing {num}");
    }, CancellationToken.None, TaskCreationOptions.None, scheduler);
    
    tasks.Add(task);
}

await Task.WhenAll(tasks);

在本例中,我们在循环中创建任务,实际上它可能在某种请求中。下面是输出:

0.0154143: Starting 0...
0.0162219: Starting 1...
1.0262272: Finishing 0
1.0265169: Starting 2...
2.0224863: Finishing 1
2.0227441: Starting 3...
4.0417418: Finishing 2
4.041956: Starting 4...
6.0332304: Finishing 3
9.0453789: Finishing 4

正如你所看到的,一旦任务 0 完成,我们会立即安排任务 1 等,所以实际上我们在这里限制了并发性。

但是让我们做一点点小小的改动:

static async Task FooBarAsync()
{
    await Task.Run(() => 42);
}

...
var task = Task.Factory.StartNew(() =>
{
    Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
    Thread.Sleep((num + 1) * 1000);
    FooBarAsync().GetAwaiter().GetResult();
    Console.WriteLine($"{sw.Elapsed.TotalSeconds}: Finishing {num}");
}, CancellationToken.None, TaskCreationOptions.None, scheduler);

输出为:

0.0176502: Starting 1...
0.0180366: Starting 0...

是的。死锁了!为什么?让我们更新一个示例以更好地查看问题:让我们跟踪当前
TaskScheduler
并将循环中创建的任务数减少到 1:

static void Trace(string message) => 
    Console.WriteLine($"{message}, TS: {TaskScheduler.Current.GetType().Name}");

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42);
    Trace("Finishing FooBarAsync");
}

static async Task Main(string[] args)
{
    var sw = Stopwatch.StartNew();
    var scheduler = new DedicatedThreadsTaskScheduler(threadCount: 2);
    var tasks = new List<Task>();
    for (int i = 0; i < 1; i++)
    {
        int num = i;
        var task = Task.Factory.StartNew(() =>
        {
            Trace($"{sw.Elapsed.TotalSeconds}: Starting {num}...");
            Thread.Sleep((num + 1) * 1000);
            FooBarAsync().GetAwaiter().GetResult();
            Trace($"{sw.Elapsed.TotalSeconds}: Finishing {num}...");
        }, CancellationToken.None, TaskCreationOptions.None, scheduler);
        
        tasks.Add(task);
    }

	Trace("Done scheduling tasks...");
    await Task.WhenAll(tasks);
}

输出为:

0.018728: Starting 0..., TS: DedicatedThreadsTaskScheduler
Starting FooBarAsync, TS: DedicatedThreadsTaskScheduler
Finishing FooBarAsync, TS: DedicatedThreadsTaskScheduler
1.028004: Finishing 0..., TS: DedicatedThreadsTaskScheduler
Done scheduling tasks..., TS: ThreadPoolTaskScheduler

现在应该相对容易理解发生了什么以及为什么当我们尝试运行超过 2 个任务时会陷入死锁。
请记住,异步方法中的每个步骤(关键字
await
后的代码)本身就是一个任务,由任务调度程序逐个执行

。默认情况下,任务调度程序是粘性的:如果
TaskScheduler
是在创建任务时提供的,那么所有后续的Task都将使用相同的
TaskScheduler

这意味着TaskScheduler贯穿所有异步方法中的 awaits。

在我们的例子中,这意味着当完成
FooAsync
时 ,我们
DedicatedThreadsTaskScheduler
被调用来运行它的后续的Task(译者注:即
await Task.Run(() => 42);
)。但是它已经忙于运行所有任务,因此它无法在
FooAsync
末尾运行一段微不足道的代码。而且由于
FooAsync
无法完成,我们无法立即完成
Task
。导致死锁。

我们能做些什么来解决这个问题?

解决方案

有几种方法可以避免此问题:

1. Use
ConfigureAwait(false)

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42);
    Trace("Finishing FooBarAsync");
}

我们在这里看到的问题与UI案例中的死锁非常相似,当任务被阻塞并且单个UI线程无法运行继续时。

我们可以通过确保每个异步方法都有
ConfigureAwait(false)
来避免这个问题。下面是具有以下
FooBarAsync
的实现时的输出。

static async Task FooBarAsync()
{
    Trace("Starting FooBarAsync");
    await Task.Run(() => 42).ConfigureAwait(false);
    Trace("Finishing FooBarAsync");
}
0.0397394: Starting 0..., TS: DedicatedThreadsTaskScheduler
Starting FooBarAsync, TS: DedicatedThreadsTaskScheduler
**Finishing FooBarAsync, TS: ThreadPoolTaskScheduler**
1.0876967: Finishing 0..., TS: DedicatedThreadsTaskScheduler

有人可能会说这是解决这个问题的正确方法,但我不同意。在我们的一个项目中,有一个实际案例,一个很难修复的库代码中存在阻塞异步方法。你可以通过使用分析器来确保你的代码遵循最佳实践,但期望每个人都遵循这些最佳实践是不切实际的。

(译者注:同样可以使用Fody来自动实现追加
.ConfigureAwait(false);
)

这里最大的问题是,这是一个不常见的情况。有许多后端系统在没有
ConfigureAwait(false)
的情况下工作得很好,因为团队没有任何带有同步上下文的 UI,而且任务调度程序的行为方式相同这一事实并不广为人知。

我只是觉得有更好的选择。

2. 以更明确的方式控制并发

我认为并发控制(又称速率限制)是应用程序非常重要的方面,重要的方面应该是明确的。

TaskScheduler
相当低级别的工具,我宁愿拥有更高级别的工具。如果工作是 CPU 密集型的,那么 PLINQ 或类似
ActionBlock
TPL DataFlow 的东西可能是更好的选择。

如果工作主要是 IO 绑定和异步的,那么可以使用
Parallel.ForEachAsync

Polly.RateLimiting
基于 的
SemaphoreSlim
自定义帮助程序类。

结论

自定义
TaskScheduler
只是一个工具,与任何工具一样,它可能被正确或错误地使用。如果您需要一个了解 UI 的调度程序,那
TaskScheduler
适合您。但是,是否应该在应用中使用一个进行并发和并行控制?我会投反对票。如果团队可能在多年前有正当理由来使用,但请仔细检查这些理由今天是否存在。

是的,请记住,阻塞异步调用可能会以多种方式反噬,
TaskScheduler
只是其中之一。因此,我建议对每个阻塞异步调用的地方进行备注,解释为什么您认为这样做既安全又有用。