引入

我们在使用mybatis的时候,会在xml中编写sql语句。比如这段动态sql代码:

<update id="update" parameterType="org.format.dynamicproxy.mybatis.bean.User">
    UPDATE users
    <trim prefix="SET" prefixOverrides=",">
        <if test="name != null and name != ''">
            name = #{name}
        </if>
        <if test="age != null and age != ''">
            , age = #{age}
        </if>
        <if test="birthday != null and birthday != ''">
            , birthday = #{birthday}
        </if>
    </trim>
    where id = ${id}
</update>

mybatis底层是如何构造这段sql的?

关于动态SQL的接口和类

SqlNode接口,简单理解就是xml中的每个标签,比如上述sql的update,trim,if标签:

public interface SqlNode {
    boolean apply(DynamicContext context);
}

SqlSource Sql源接口,代表从xml文件或注解映射的sql内容,主要就是用于创建BoundSql,有实现类DynamicSqlSource(动态Sql源),StaticSqlSource(静态Sql源)等:

public interface SqlSource {
    BoundSql getBoundSql(Object parameterObject);
}

BoundSql类,封装mybatis最终产生sql的类,包括sql语句,参数,参数源数据等参数:

XNode,一个Dom API中的Node接口的扩展类:

BaseBuilder接口及其实现类(属性,方法省略了,大家有兴趣的自己看),这些Builder的作用就是用于构造sql:

下面我们简单分析下其中4个Builder:

  • XMLConfigBuilder
    :解析mybatis中configLocation属性中的全局xml文件,内部会使用XMLMapperBuilder解析各个xml文件。
  • XMLMapperBuilder
    :遍历mybatis中mapperLocations属性中的xml文件中每个节点的Builder,比如user.xml,内部会使用XMLStatementBuilder处理xml中的每个节点。
  • XMLStatementBuilder
    :解析xml文件中各个节点,比如select,insert,update,delete节点,内部会使用XMLScriptBuilder处理节点的sql部分,遍历产生的数据会丢到Configuration的mappedStatements中。
  • XMLScriptBuilder
    :解析xml中各个节点sql部分的Builder。

LanguageDriver接口及其实现类(属性,方法省略了,大家有兴趣的自己看),该接口主要的作用就是构造sql:

简单分析下XMLLanguageDriver(处理xml中的sql,RawLanguageDriver处理静态sql):XMLLanguageDriver内部会使用XMLScriptBuilder解析xml中的sql部分。

源码分析

Spring与Mybatis整合的时候需要配置SqlSessionFactoryBean,该配置会加入数据源和mybatis xml配置文件路径等信息:

<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <property name="dataSource" ref="dataSource"/>
    <property name="configLocation" value="classpath:mybatisConfig.xml"/>
    <property name="mapperLocations" value="classpath*:org/format/dao/*.xml"/>
</bean>

我们就分析这一段配置背后的细节:

SqlSessionFactoryBean实现了Spring的InitializingBean接口,InitializingBean接口的afterPropertiesSet方法中会调用buildSqlSessionFactory方法 该方法内部会使用XMLConfigBuilder解析属性configLocation中配置的路径,还会使用XMLMapperBuilder属性解析mapperLocations属性中的各个xml文件。部分源码如下:

由于XMLConfigBuilder内部也是使用XMLMapperBuilder,我们就看看XMLMapperBuilder的解析细节:

我们关注一下,增删改查节点的解析:

XMLStatementBuilder的解析:

默认会使用XMLLanguageDriver创建SqlSource(Configuration构造函数中设置)。

XMLLanguageDriver创建SqlSource:

XMLScriptBuilder解析sql:

得到SqlSource之后,会放到Configuration中,有了SqlSource,就能拿BoundSql了,BoundSql可以得到最终的sql。

实例分析

以下面的xml解析大概说下parseDynamicTags的解析过程:

<update id="update" parameterType="org.format.dynamicproxy.mybatis.bean.User">
    UPDATE users
    <trim prefix="SET" prefixOverrides=",">
        <if test="name != null and name != ''">
            name = #{name}
        </if>
        <if test="age != null and age != ''">
            , age = #{age}
        </if>
        <if test="birthday != null and birthday != ''">
            , birthday = #{birthday}
        </if>
    </trim>
    where id = ${id}
</update>

parseDynamicTags方法的返回值是一个List,也就是一个Sql节点集合。SqlNode本文一开始已经介绍,分析完解析过程之后会说一下各个SqlNode类型的作用。

首先根据update节点(Node)得到所有的子节点,分别是3个子节点:

  • 文本节点 \n UPDATE users
  • trim子节点 ...
  • 文本节点 \n where id = #

遍历各个子节点:

  • 如果节点类型是文本或者CDATA,构造一个TextSqlNode或StaticTextSqlNode;
  • 如果节点类型是元素,说明该update节点是个动态sql,然后会使用NodeHandler处理各个类型的子节点。这里的NodeHandler是XMLScriptBuilder的一个内部接口,其实现类包括TrimHandler、WhereHandler、SetHandler、IfHandler、ChooseHandler等。看类名也就明白了这个Handler的作用,比如我们分析的trim节点,对应的是TrimHandler;if节点,对应的是IfHandler...这里子节点trim被TrimHandler处理,TrimHandler内部也使用parseDynamicTags方法解析节点。

遇到子节点是元素的话,重复以上步骤:

trim子节点内部有7个子节点,分别是文本节点、if节点、是文本节点、if节点、是文本节点、if节点、文本节点。文本节点跟之前一样处理,if节点使用IfHandler处理。遍历步骤如上所示,下面我们看下几个Handler的实现细节。

IfHandler处理方法也是使用parseDynamicTags方法,然后加上if标签必要的属性:

private class IfHandler implements NodeHandler {
    public void handleNode(XNode nodeToHandle, List<SqlNode> targetContents) {
      List<SqlNode> contents = parseDynamicTags(nodeToHandle);
      MixedSqlNode mixedSqlNode = new MixedSqlNode(contents);
      String test = nodeToHandle.getStringAttribute("test");
      IfSqlNode ifSqlNode = new IfSqlNode(mixedSqlNode, test);
      targetContents.add(ifSqlNode);
    }
}

TrimHandler处理方法也是使用parseDynamicTags方法,然后加上trim标签必要的属性:

private class TrimHandler implements NodeHandler {
    public void handleNode(XNode nodeToHandle, List<SqlNode> targetContents) {
      List<SqlNode> contents = parseDynamicTags(nodeToHandle);
      MixedSqlNode mixedSqlNode = new MixedSqlNode(contents);
      String prefix = nodeToHandle.getStringAttribute("prefix");
      String prefixOverrides = nodeToHandle.getStringAttribute("prefixOverrides");
      String suffix = nodeToHandle.getStringAttribute("suffix");
      String suffixOverrides = nodeToHandle.getStringAttribute("suffixOverrides");
      TrimSqlNode trim = new TrimSqlNode(configuration, mixedSqlNode, prefix, prefixOverrides, suffix, suffixOverrides);
      targetContents.add(trim);
    }
}

以上update方法最终通过parseDynamicTags方法得到的SqlNode集合如下:

trim节点:

由于这个update方法是个动态节点,因此构造出了DynamicSqlSource。DynamicSqlSource内部就可以构造sql了:

DynamicSqlSource内部的SqlNode属性是一个MixedSqlNode。然后我们看看各个SqlNode实现类的apply方法。下面分析一下各个SqlNode实现类的apply方法实现:

MixedSqlNode:MixedSqlNode会遍历调用内部各个sqlNode的apply方法。

public boolean apply(DynamicContext context) {
   for (SqlNode sqlNode : contents) {
     sqlNode.apply(context);
   }
   return true;
}

StaticTextSqlNode:直接append sql文本。

public boolean apply(DynamicContext context) {
   context.appendSql(text);
   return true;
}

IfSqlNode:这里的evaluator是一个ExpressionEvaluator类型的实例,内部使用了OGNL处理表达式逻辑。

public boolean apply(DynamicContext context) {
   if (evaluator.evaluateBoolean(test, context.getBindings())) {
     contents.apply(context);
     return true;
   }
   return false;
}

TrimSqlNode:

public boolean apply(DynamicContext context) {
    FilteredDynamicContext filteredDynamicContext = new FilteredDynamicContext(context);
    boolean result = contents.apply(filteredDynamicContext);
    filteredDynamicContext.applyAll();
    return result;
}

public void applyAll() {
    sqlBuffer = new StringBuilder(sqlBuffer.toString().trim());
    String trimmedUppercaseSql = sqlBuffer.toString().toUpperCase(Locale.ENGLISH);
    if (trimmedUppercaseSql.length() > 0) {
        applyPrefix(sqlBuffer, trimmedUppercaseSql);
        applySuffix(sqlBuffer, trimmedUppercaseSql);
    }
    delegate.appendSql(sqlBuffer.toString());
}

private void applyPrefix(StringBuilder sql, String trimmedUppercaseSql) {
    if (!prefixApplied) {
        prefixApplied = true;
        if (prefixesToOverride != null) {
            for (String toRemove : prefixesToOverride) {
                if (trimmedUppercaseSql.startsWith(toRemove)) {
                    sql.delete(0, toRemove.trim().length());
                    break;
                }
            }
        }
        if (prefix != null) {
            sql.insert(0, " ");
            sql.insert(0, prefix);
        }
   }
}

TrimSqlNode的apply方法也是调用属性contents(一般都是MixedSqlNode)的apply方法,按照实例也就是7个SqlNode,都是StaticTextSqlNode和IfSqlNode。 最后会使用FilteredDynamicContext过滤掉prefix和suffix。

上周,DeepSeek-V3 将训练大模型的成本给打下来了,但训练大模型对普通开发者来说仍然门槛很高。所以,本期的热门开源项目聚焦于降低 LLM 应用开发的入门门槛。

极易上手的向量数据库 chroma 用起来十分方便,只需一行命令
pip install chromadb
就能轻松拥有一个向量数据库,用于存储和检索向量数据。接下来是专为构建实时 AI 应用的 Python ETL 框架 pathway,它提供了简单易用的 Python API 和可视化监控界面,全面提升 LLM 应用处理数据的效率。同样开箱即用的 Rust 全栈 Web 框架 Loco,则将 Rails 的开发体验与 Rust 的高性能相结合,是快速开发 Web 应用不错的选择。

最后是两个相见恨晚的开源项目,Python 项目打包神器 pex,它为 Python 项目提供了一键部署的丝滑体验。以及可以轻松部署家庭多媒体中心的 docker-xiaoya。

  • 本文目录
    • 1. 热门开源项目
      • 1.1 极易上手的向量数据库:chroma
      • 1.2 Rust 的全栈 Web 框架:Loco
      • 1.3 开箱即用的端口扫描工具:RustScan
      • 1.4 实时更新的轻量级推荐系统:monolith
      • 1.5 构建实时 AI 系统的 Python 框架:pathway
    • 2. HelloGitHub 热评
      • 2.1 相见恨晚的 Python 项目打包工具:pex
      • 2.2 一键部署完整的家庭多媒体中心:docker-xiaoya
    • 3. 结尾

1. 热门开源项目

1.1 极易上手的向量数据库:chroma

主语言:Rust

Star:16.3k

周增长:400

这是一款专为 AI 应用设计的开源向量数据库(Embedding Database),支持 Python、JavaScript、Rust 等多种编程语言。它提供了简单易用的 API 和多种启动模式(内存、文件存储、服务器),支持基于 embedding 模型的自动向量化处理,以及查询、过滤、密度估计等操作,适用于快速构建基于语义的搜索和推荐等应用。

import chromadb
client = chromadb.Client()

collection = client.create_collection("all-my-documents")
collection.add(
    documents=["This is document1", "This is document2"], # we handle tokenization, embedding, and indexing automatically. You can skip that and add your own embeddings as well
    metadatas=[{"source": "notion"}, {"source": "google-docs"}], # filter on these!
    ids=["doc1", "doc2"], # unique for each doc
)

results = collection.query(
    query_texts=["This is a query document"],
    n_results=2,
    # where={"metadata_field": "is_equal_to_this"}, # optional filter
    # where_document={"$contains":"search_string"}  # optional filter
)

GitHub 地址→
github.com/chroma-core/chroma

1.2 Rust 的全栈 Web 框架:Loco

主语言:Rust

Star:6.4k

周增长:600

该项目是受 Ruby on Rails 启发的 Rust Web 框架,专为帮助开发者快速构建 Web 应用而设计。它结合了类似 Rails 的开发体验和 Rust 的高性能优势,支持 ORM 集成、后台任务、中间件(认证、日志、错误处理)、生成部署配置等功能,适用于开发个人项目和初创企业的 Web 应用。

GitHub 地址→
github.com/loco-rs/loco

1.3 开箱即用的端口扫描工具:RustScan

主语言:Rust

Star:15k

这是一个用 Rust 开发的端口扫描工具,能够在 3 秒内扫描指定 IP 的所有端口。它提供了灵活的脚本引擎,支持 Python、Lua 和 Shell 脚本,开发者可以根据需求自定义脚本,实现个性化的扫描和处理逻辑。

GitHub 地址→
github.com/RustScan/RustScan

1.4 实时更新的轻量级推荐系统:monolith

主语言:Python

Star:6.6k

周增长:2.4k

该项目是字节跳动开源的一款轻量级推荐系统,旨在提升推荐系统的准确性和实时性。它基于 TensorFlow 构建,支持无冲突嵌入表(collisionless embedding tables)、批量和实时训练等功能,能够快速响应用户的行为变化,并及时更新模型,提升推荐效果。

GitHub 地址→
github.com/bytedance/monolith

1.5 构建实时 AI 系统的 Python 框架:pathway

主语言:Python

Star:12k

周增长:1.4k

这是一个专为流处理、实时分析、LLM 管道和 RAG 应用设计的 Python ETL 框架。它底层采用 Rust 引擎,具备高吞吐和低延迟的实时处理能力,同时提供简单易用的 Python API 和可视化监控面板,支持多种数据源、数据转换和持久化等功能。

GitHub 地址→
github.com/pathwaycom/pathway

2. HelloGitHub 热评

在此章节中,我们将为大家介绍本周 HelloGitHub 网站上的热门开源项目,我们不仅希望您能从中收获开源神器和编程知识,更渴望“听”到您的声音。欢迎您与我们分享使用这些
开源项目的亲身体验和评价
,用最真实反馈为开源项目的作者注入动力。

2.1 相见恨晚的 Python 项目打包工具:pex

主语言:Python

这是一个开源的 Python 项目打包工具,专为跨环境部署和无法访问公网的部署场景设计。它能够将 Python 项目及其所有依赖,甚至是 Python 解释器(可选),打包成单个可执行文件(.pex),让开发者无需安装运行环境,即可直接运行 Python 程序,支持 Linux 和 macOS 系统。

项目详情→
hellogithub.com/repository/5c47cbf587f448fd8c4106436b3de8e3

2.2 一键部署完整的家庭多媒体中心:docker-xiaoya

主语言:Shell

该项目提供了一键部署 Alist、Emby 和 Jellyfin 服务的解决方案,帮你轻松构建完整的家庭多媒体中心,支持 Linux、macOS、Windows 等平台。

项目详情→
hellogithub.com/repository/c0360e74337e448b852ab96ea4382a62

3. 结尾

以上就是本期「GitHub 热点速览」的全部内容,希望这些开源项目能激发你的兴趣,成为你下一个值得尝试的工具!如果你有其他好玩、有趣的 GitHub 开源项目想要分享,欢迎来
HelloGitHub
与我们交流和讨论。

往期回顾

适配器模式(Adapter Pattern)

适配器模式
是一种结构型设计模式,用于将一种接口转换为客户端期望的另一个接口,使得原本因接口不兼容而无法一起工作的类可以协同工作。适配器为中间者,连接着两个互不相容的接口,从而实现接口的适配。

核心思想
:在不改变现有代码的情况下,将一个类的接口转换为客户端期望的接口。

主要组成部分

适配器模式包含以下主要角色:

  1. 目标接口(Target)
    :定义客户端使用的接口;
  2. 需要适配的类(Adaptee)
    :一个现有的接口或类,其接口与目标接口不兼容;
  3. 适配器(Adapter)
    :一个中间层,
    连接目标接口和需要适配的类
    ,实现接口的转换;
  4. 客户端(Client)
    :通过目标接口使用适配器的功能。

一句话概括适配器组成之间的关系:

一般情况下,
适配器

目标接口

具体实现类
,实现的
具体功能
是通过调用
需要适配的类的功能
来完成。

适配器模式的两种实现方式

类适配器(基于继承)

通过
继承
需要适配的类,并实现目标接口。
(多用于单继承语言,例如Java)

对象适配器(基于组合)

通过
组合
一个需要适配的类的实例,并实现目标接口。
(推荐使用,灵活性更高)

案例实现

场景为:插头适配器

假设你有一个旧接口为
两孔插头
,现在需要
适配
为新的电源插头接口(
三孔插头
)。

目标接口--适配器--需要适配的类,这三者的关系为客户端要使用
适配类
(三孔插头),但是无法直接插入两孔插头的插座,需要先插入
适配器
,然后再将适配器插入
两孔插头
,所以他们的连接关系为:需要适配的类>>>适配器>>>目标接口。

案例类图

类适配的类图(基于继承)

image

对象适配的类图(基于组合)

image

目标接口(Target)

两孔插头

public interface TwoPinPlug {
    void connect();
}

被适配类(Adaptee)

三孔插头--接口

public interface ThreePinPlug {
    void connectWithThreePins();
}

三孔插头--具体实现类

public class ThreePinPlugImpl implements ThreePinPlug {
    public void connectWithThreePins() {
        System.out.println("连接三孔插头");
    }
}

类适配器(Adapter - 继承)

通过继承来实现的适配器

public class PlugAdapter extends ThreePinPlug implements TwoPinPlug {
    @Override
    public void connect() {
        super.connectWithThreePins(); // 转换三孔插头的接口为两孔插头接口
    }
}

对象适配器(Adapter - 组合)

通过组合的方式来实现的适配器

public class PlugAdapter2 implements TwoPinPlug {
    private ThreePinPlug threePinPlug;

    public PlugAdapter2(ThreePinPlug threePinPlug) {
        this.threePinPlug = threePinPlug;
    }

    @Override
    public void connect() {
        threePinPlug.connectWithThreePins(); // 转换三孔插头的接口为两孔插头接口
    }
}

客户端代码

public class AdapterPatternDemo {
    public static void main(String[] args) {
        // 使用类适配器
        TwoPinPlug adapter1 = new PlugAdapter();
        adapter1.connect();

        // 使用对象适配器
        ThreePinPlug threePinPlug = new ThreePinPlugImpl();
        TwoPinPlug adapter2 = new PlugAdapter2(threePinPlug);
        adapter2.connect();
    }
}

执行结果

连接三孔插头

连接三孔插头

总结

适配器模式通过引入中间层(适配器),将不兼容的接口转换为客户端需要的接口,实现系统的兼容性和灵活性。

适用场景:当现有类的接口与需求接口不兼容时;当需要将多个不同接口的类统一为同一个接口时;当不修改已有代码且需要与新需求兼容时。

image

什么是设计模式?

设计模式--原型模式及其编程思想

掌握设计模式--单例模式及其思想​

掌握设计模式--生成器模式

掌握设计模式--简单工厂模式

掌握设计模式--工厂方法模式

掌握设计模式--抽象工厂模式

掌握设计模式--装饰模式

掌握设计模式--组合模式


超实用的SpringAOP实战之日志记录

2023年下半年软考考试重磅消息

通过软考后却领取不到实体证书?

计算机算法设计与分析(第5版)

Java全栈学习路线、学习资源和面试题一条龙

软考证书=职称证书?

软考中级--软件设计师毫无保留的备考分享

  • 该模块负责管理事件的注册、调度和处理,充当事件驱动的核心引擎,驱动整个klippy系统的运行。
  • 该模块提供了一个统一的接口register_callback,使各个模块能够注册自己的回调函数以响应特定的事件。
  • 使用事件循环的方式,不断地检查事件的状态并触发相应的回调函数。

reactor模式

Reactor 模式是一个事件驱动的编程模式,它允许程序以非阻塞的方式处理多个 I/O 操作。这个模式主要包含四个核心组件:

  1. 事件循环(Event Loop)
    :它负责不断监听事件,并将其分发给相应的处理器。
  2. 反应堆(Reactor)
    :作为事件循环的管理者,它监视一组资源,等待事件发生。
  3. 资源(Resources)
    :通常是网络套接字或文件描述符,是反应堆监视的对象。
  4. 事件处理器(Event Handlers)
    :每个事件都有相应的处理器来响应。

入口

try:
    select.poll
    Reactor = PollReactor
except:
    Reactor = SelectReactor
  • 如果支持 select.poll,则调用PollReactor,否则,调用SelectReactor

初始化

class PollReactor(SelectReactor):
    def __init__(self, gc_checking=False):
        SelectReactor.__init__(self, gc_checking)
        self._poll = select.poll()
        self._fds = {}
class SelectReactor:
    NOW = _NOW
    NEVER = _NEVER
    def __init__(self, gc_checking=False):
        # Main code
        self._process = False
        self.monotonic = chelper.get_ffi()[1].get_monotonic
        # Python garbage collection
        self._check_gc = gc_checking
        self._last_gc_times = [0., 0., 0.]
        # Timers
        self._timers = []
        self._next_timer = self.NEVER
        # Callbacks
        self._pipe_fds = None
        self._async_queue = queue.Queue()
        # File descriptors
        self._fds = []
        # Greenlets
        self._g_dispatch = None
        self._greenlets = []
        self._all_greenlets = []
  • PollReactor 继承自SelectReactor,在初始化的时候,调用SelectReactor初始化,对timers、文件描述符、Greenlets等进行初始化。
def run(self):
    if self._pipe_fds is None:
        self._setup_async_callbacks()
    self._process = True
    g_next = ReactorGreenlet(run=self._dispatch_loop)
    self._all_greenlets.append(g_next)
    g_next.switch()
  • run方法 在klippy.py 主循环 打印机对象初始化后,会调用该方法。
  • self._process: 设置True 为运行状态。
  • ReactorGreenlet(run=self._dispatch_loop):生成协程,进入事件主循环协程。
  • g_next.switch():切换到协程g_next。

事件驱动主循环

def _dispatch_loop(self):
    self._g_dispatch = g_dispatch = greenlet.getcurrent()
    busy = True
    eventtime = self.monotonic()
    while self._process:
        timeout = self._check_timers(eventtime, busy)
        busy = False
        res = self._poll.poll(int(math.ceil(timeout * 1000.)))
        eventtime = self.monotonic()
        for fd, event in res:
            busy = True
            self._fds[fd](eventtime)
            if g_dispatch is not self._g_dispatch:
                self._end_greenlet(g_dispatch)
                eventtime = self.monotonic()
                break
    self._g_dispatch = None

该方法实现了基于poll函数的事件循环,通过轮询文件描述符上的事件,并根据事件类型调用相应的回调函数进行处理。在处理完一个事件后,如果需要切换到其他协程进行执行,则使用greenlet实现切换。整个过程不断循环,直到_process 为False,则结束事件循环。

  • eventtime = self.monotonic():获取当前的时间
  • timeout = self._check_timers(eventtime, busy):检查是否有任何定时器事件需要处理。该方法返回下一个定时器事件的超时时间。
  • self._poll.poll(int(math.ceil(timeout * 1000.))):轮询操作,等待文件描述符上的事件发生。
  • self._fds
    fd
    : 调用文件描述符相应的回调函数。主要监听本地socket文件和一个reactor的双向管道文件描述符。
  • if g_dispatch is not self._g_dispatch: 判断当前的greenlet与事件循环的greenlet是否相同,不同则切换协程。
  • self._end_greenlet(g_dispatch):结束当前协程,并切换到其他协程。

协程

  • greenlet 是python实现协程的一个三方库,是一种基于协作式的多任务编程模型,允许在同一线程内实现多个并发执行的任务。
  • greenlet 是协作和顺序的。当一个 greenlet 运行时,其他 greenlet 都不能运行;开发者可以完全控制何时在 greenlet 之间切换执行。
  • 协作式调度: 协程通常使用协作式调度,这意味着它们在适当的时机主动让出控制权,从而允许其他协程执行。这种方式可以有效避免线程切换的开销。
  • 事件循环: 在许多协程实现中(如 Python 的
    asyncio
    ),协程是在事件循环中运行的。事件循环负责调度和管理协程的执行。
  • I/O 密集型任务: 协程特别适合处理 I/O 密集型任务(如网络请求和文件操作),因为在等待 I/O 操作完成时,协程可以挂起自己,让其他协程继续执行。

定时器注册

def register_timer(self, callback, waketime=NEVER):

    timer_handler = ReactorTimer(callback, waketime)
    timers = list(self._timers)
    timers.append(timer_handler)
    self._timers = timers
    self._next_timer = min(self._next_timer, waketime)
    return timer_handler
  • 将一个回调函数注册到定时器队列中

定时器检查

def _check_timers(self, eventtime, busy):
    if eventtime < self._next_timer:
        if busy:
            return 0.
        if self._check_gc:
            gi = gc.get_count()
            if gi[0] >= 700:
                # Reactor looks idle and gc is due - run it
                gc_level = 0
                if gi[1] >= 10:
                    gc_level = 1
                    if gi[2] >= 10:
                        gc_level = 2
                self._last_gc_times[gc_level] = eventtime
                gc.collect(gc_level)
                return 0.
        return min(1., max(.001, self._next_timer - eventtime))
    self._next_timer = self.NEVER
    g_dispatch = self._g_dispatch
    for t in self._timers:
        waketime = t.waketime
        if eventtime >= waketime:
            t.waketime = self.NEVER
            t.waketime = waketime = t.callback(eventtime)
            if g_dispatch is not self._g_dispatch:
                self._next_timer = min(self._next_timer, waketime)
                self._end_greenlet(g_dispatch)
                return 0.
        self._next_timer = min(self._next_timer, waketime)
    return 0.
  • 该函数主要是通过检查定时器的到期时间,决定是否执行定时器的回调函数,以及更新下一次检查定时器的时间。
  • 如果未到定时器执行时间,判断状态是否忙碌,忙碌直接返回;否则进行垃圾回收操作
  • 遍历定时器列表,回调定时器函数进行处理
  • 在触发回调函数后,检查是否发生了协程切换。如果
    g_dispatch
    和当前的
    _g_dispatch
    不一致,意味着其他协程已经接管控制,结束当前协程并返回。
  • 循环结束后,更新下次要检查的定时器时间
    self._next_timer
    ,确保系统按时检查到期的定时器。

定时器更新

def update_timer(self, timer_handler, waketime):
    timer_handler.waketime = waketime
    self._next_timer = min(self._next_timer, waketime)
  • 主要是更新定时器的执行时间,是否立即执行。

1. 信道是golang中的顶级公民

goroutine结合信道channel是golang中实现并发编程的标配。

信道给出了一种不同于
传统共享内存并发通信
的新思路,以一种
通道复制
的思想解耦了并发编程的各个参与方。

信道分为两种: 无缓冲和有缓冲信道(先入先出)。

分别用于goroutine同步和异步生产消费:

无缓冲信道: 若没有反向的goroutine在做动作, 当前goroutine会阻塞;
有缓冲信道: goroutine 直接面对的是缓冲队列, 队列满则写阻塞, 队列空则读阻塞。

一个陷阱: 信道被关闭后, 原来的goroutine阻塞状态不会维系, 能从信道读取到零值。

for range可以用于信道 :
一直从指定信道中

值, 没有数据会阻塞, 直到信道关闭会自动退出循环。

var ch chan int = make(chan int, 10)
go func() {
	for i := 0; i < 20; i++ { 
		ch <- i
	}
	close(ch)
}()

time.Sleep(time.Second * 2)
for ele := range ch {
	fmt.Println(ele)
}

output: 0,1,2,3,4...19

上面的示例描述了信道4个阶段:
写完10个数据(阻塞写)、暂停2s、
读取10个数据(解除阻塞写)、读完20个数据、关闭信道。

2. 信道channel实现思路大盘点

channel是指向
hchan
结构体的指针.

        type hchan struct {
        	qcount   uint           // 队列中已有的缓存元素的数量
        	dataqsiz uint           // 环形队列的容量
        	buf      unsafe.Pointer // 环形队列的地址
        	elemsize uint16
        	closed   uint32        // 标记是否关闭,初始化为0,一旦close(ch)为1
        	elemtype *_type // 元素类型
        	sendx    uint   // 待发送的元素索引
        	recvx    uint   // 待接受元素索引
        	recvq    waitq  // 阻塞等待的读goroutine队列
        	sendq    waitq  // 阻塞等待的写gotoutine队列
         
        	// lock protects all fields in hchan, as well as several
        	// fields in sudogs blocked on this channel.
        	//
        	// Do not change another G's status while holding this lock
        	// (in particular, do not ready a G), as this can deadlock
        	// with stack shrinking.
        	lock mutex
        }
        
    type waitq struct {  
        first *sudog  
        last *sudog  
    }

2.1 静态全局解读

两个核心的结构

① 环形队列
buf
(buf、dataqsize、sendx、recvx 圈定了一个有固定长度,由读/写指针控制队列数据的环形队列),从这看出队列是以链表实现。

② 存放阻塞写G和阻塞读G的队列
sendq

recvq
,
recvq、sendq存放的不是当前通信的goroutine
, 而是
因读写信道而阻塞的goroutine
:

  • 如果 qcount <dataqsiz(队列未满),sendq就为空(写就不会阻塞);
  • 如果 qcount > 0 (队列不为空),recvq就为空(读就不会阻塞)。

一旦解除阻塞,读/写动作会给到先进入阻塞队列的goroutine,也就是 recvq、sendq也是先进先出。

2.2 动态解读demo

以第一部分的demo为例:

第一阶段: 写入0到9这个10个元素

  1. goroutine在写数据之后会获取锁,以确保安全地修改信道底层的
    hchan
    结构体;
  2. 向环形队列
    buf
    入队enqueue元素,实际是将原始数据拷贝进环形队列
    buf
    的待插入位置
    sendx
  3. 入队操作完成,释放锁。

第二阶段:信道满,写阻塞(写goroutine会停止,并等待读操作唤醒)

① 基于
写goroutine
创建
sudog
, 并将其放进
sendq队列
中;

② 调用
gopark
函数,让调度器P终止该goroutine执行。

调度器P将该goroutine状态改为
waiting
, 并从调度器P挂载的
runQueue
中移除,调度器P重新出队一个G交给OS线程来执行,这就是上下文切换,G被阻塞了而不是OS线程。


读goroutine
开始被调度执行:

第三阶段: 读前10个元素(解除写阻塞)

  1. for range chan: 读goroutine从
    buf
    中出队元素: 将信道元素拷贝到目标接收区;
  2. 写goroutine从
    sendq
    中出队,因为现在信道不满,写不会阻塞;
  3. 调度器P调用
    goready
    , 将写goroutine状态变为
    runnable
    ,并移入runQueue。

下面的源码截取自
chansend()

体现了写信道--> 写阻塞---> 被唤醒的过程

     // 这一部分是写数据, 从这里也可以看出是点对点的覆写,原buf内队列元素不用移动, 只用关注sendx  
     
        if c.qcount < c.dataqsiz {  // 信道未满,则写不会阻塞=>senq为空	
                qp := chanbuf(c, c.sendx)   // chanbuf(c, i) 返回的是信道buf中待插入的位置指针
                typedmemmove(c.elemtype, qp, ep)  
                c.sendx++
                if c.sendx == c.dataqsiz {
                     c.sendx = 0
                }
                c.qcount++
                return true
        }
        if !block {       // 用于select case结构中,不阻塞select case的选择逻辑
                unlock(&c.lock)
                return false
        }

  // 这二部分是: 构建sudog,放进写阻塞队列,阻塞当前写gooroutine的执行
        // Block on the channel. Some receiver will complete our operation for us.
        gp := getg()     // 获取当前的goroutine  https://go.dev/src/runtime/HACKING
        mysg := acquireSudog()   // sudog是等待队列sendq中的元素,封装了goroutine
        mysg.releasetime = 0
        if t0 != 0 {
                mysg.releasetime = -1
        }
        // No stack splits between assigning elem and enqueuing mysg
        // on gp.waiting where copystack can find it.
        mysg.elem = ep
        mysg.waitlink = nil
        mysg.g = gp
        mysg.isSelect = false
        mysg.c = c
        gp.waiting = mysg
        gp.param = nil
        c.sendq.enqueue(mysg)  // 当前goroutine压栈sendq
        // Signal to anyone trying to shrink our stack that we're about
        // to park on a channel. The window between when this G's status
        // changes and when we set gp.activeStackChans is not safe for
        // stack shrinking.
        gp.parkingOnChan.Store(true)
        reason := waitReasonChanSend

        gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)   // 这里是阻塞函数
    	
        KeepAlive(ep)
 // 这三部分: 调度器唤醒了当前goroutine
        // someone woke us up.  
        if mysg != gp.waiting {
                throw("G waiting list is corrupted")
        }
        gp.waiting = nil
        gp.activeStackChans = false
        closed := !mysg.success
        gp.param = nil
        if mysg.releasetime > 0 {
                blockevent(mysg.releasetime-t0, 2)
        }
        mysg.c = nil
        releaseSudog(mysg)
        if closed {     // 已经关闭了,再写数据会panic
             if c.closed == 0 {
                 throw("chansend: spurious wakeup")
             }
            panic(plainError("send on closed channel"))
        }
        return true

其中:


getg
获取当前的goroutine,sudog是goroutine的封装,表征一个因读写信道而阻塞的G,


typedmemmove(c.elemtype, qp, ep)
: 写数据到信道buf,由两个指针来完成拷贝覆写。

  //  typedmemmove copies a value of type typ to dst from src.
    func typedmemmove(typ *abi.Type, dst, src unsafe.Pointer) {
    	if dst == src {
    		return
    	}
    	if writeBarrier.enabled && typ.Pointers() {
    		// This always copies a full value of type typ so it's safe
    		// to pass typ along as an optimization. See the comment on
    		// bulkBarrierPreWrite.
    		bulkBarrierPreWrite(uintptr(dst), uintptr(src), typ.PtrBytes, typ)
    	}
    	// There's a race here: if some other goroutine can write to
    	// src, it may change some pointer in src after we've
    	// performed the write barrier but before we perform the
    	// memory copy. This safe because the write performed by that
    	// other goroutine must also be accompanied by a write
    	// barrier, so at worst we've unnecessarily greyed the old
    	// pointer that was in src.
    	memmove(dst, src, typ.Size_)
    	if goexperiment.CgoCheck2 {
    		cgoCheckMemmove2(typ, dst, src, 0, typ.Size_)
    	}
    }

③ 我们看上面源码的第三部分, 唤醒了阻塞的
写goroutine
, 但是这里貌似没有将写goroutine携带的值传递给信道或对端。
实际上这个行为是在
recv
函数内。

跟一下接收方:读第一个元素,刚解除写阻塞的源码:

// 发现sendq有阻塞的写G,则读取,并使用该写G携带的数据填充数据
// Just found waiting sender with not closed.
    if sg := c.sendq.dequeue(); sg != nil {
    // Found a waiting sender. If buffer is size 0, receive value
    // directly from sender. Otherwise, receive from head of queue
    // and add sender's value to the tail of the queue (both map to
    // the same buffer slot because the queue is full).
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
}
if c.qcount > 0 {  // 如果sendq队里没有阻塞G, 则直接从队列中读值
    // Receive directly from queue
}

---

{
    // Queue is full. Take the item at the
    // head of the queue. Make the sender enqueue
    // its item at the tail of the queue. Since the
    // queue is full, those are both the same slot.
    qp := chanbuf(c, c.recvx)  // 拿到buf中待接受元素指针
    if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
    }
    // copy data from queue to receiver
    if ep != nil {
            typedmemmove(c.elemtype, ep, qp)  // 将buf中待接收元素qp拷贝到目标指针ep
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)  //  将阻塞sendq队列中出站的sudog携带的值写入到待插入指针。
    c.recvx++
    if c.recvx == c.dataqsiz {
            c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
        

从上线源码可以验证:


读goroutine
读取第一个元素之前,信道满,此时sendx=recvx,也即信道内读写指针指向同一个槽位;

② 读取第一个元素,解除写阻塞:
sendq
写G队列会出队第一个sudog, 将其携带的元素填充进
buf
待插入指针
sendx
,因为此时sendx=recvx,故第二次
typedmemmove(c.elemtype, qp, sg.elem)
是合理的。

如果sendq队列没有阻塞G, 则直接从buf中读取值。

3. 不要使用共享内存来通信,而是使用通信来共享内存

常见的后端java C#标配使用共享内存来通信, 比如 mutex、lock 关键词:
通过对一块共有的区域做属性变更来反映系统当前的状态,详细的请搜索
同步索引块

golang 推荐使用通信来共享内存, 这个是怎么理解的呢?

你要想使用某块内存数据, 并不是直接共享给你, 而是给你一个信道作为访问的接口, 并且你得到的是目标数据的拷贝,由此形成的信道访问为通信方式;

而原始的目标数据的生命周期由产生这个数据的G来决定, 它甚至不用care自己是不是要被其他G获知,因此体现了解耦并发编程参与方的作用。

https://medium.com/womenintechnology/exploring-the-internals-of-channels-in-go-f01ac6e884dc

4. 信道的实践指南

4.1 无缓冲信道

结合了通信(值交换)和同步。

    c := make(chan int)  // Allocate a channel.
    // Start the sort in a goroutine; when it completes, signal on the channel.
    go func() {
        list.Sort()
        c <- 1  // Send a signal; value does not matter.
    }()
    doSomethingForAWhile()
    <-c   // Wait for sort to finish; discard sent value.

4.2 有缓冲信道

基础实践: 信号量、限流能力

下面演示了:服务端使用有缓冲信道限制并发请求

var sem = make(chan int, MaxOutstanding) 

func Serve(queue chan *Request) {
    for req := range queue {
        req:= req
        sem <- 1   
        go func() {   // 只会开启MaxOutstanding个并发协程
            process(req)
            <-sem
        }()
    }
}

上面出现了两个信道:

sem
提供了限制服务端并发处理请求的信号量

queue
提供了一个客户端请求队列,起媒介/解耦的作用

解多路复用

多路复用是网络编程中一个耳熟能详的概念,nginx redis等高性能web、内存kv都用到了这个技术 。

这个
解多路复用
是怎么理解呢?

我们针对上面的服务端,编写客户端请求, 独立的客户端请求被服务端Serve收敛之后, Serve就起到了多路复用的概念,在
Request
定义
resultChan信道
,就给每个客户端请求提供了独立获取请求结果的能力, 这便是一种解多路复用。

    type Request struct {
        args        []int
        f           func([]int) int
        resultChan  chan int
    }
    request := &Request{[]int{3, 4, 5}, nil, make(chan int)}

    func SendReq(req *Request){
        // Send request
        clientRequests <- request
        // Wait for response.
        fmt.Printf("answer: %d\n", <-request.resultChan)
    }

在服务端,定义handler,返回响应结果

    // 定义在服务端的处理handler
    func sum(a []int) (s int) {
        for _, v := range a {
            s += v
        }
        return
    }

    func process(req *Request) {
       req.f = sum
       req.resultChan <- req.f(req.args)
    }

基于cpu的并行编程

如果计算可被划分为独立的(不相互依赖的)计算分片,则可以利用信道开启CPU的并行编程能力。

    var numCPU = runtime.NumCPU() // number of CPU cores

    func (v Vector) DoAll(u Vector) {
        c := make(chan int, numCPU)  // Buffering optional but sensible.
        for i := 0; i < numCPU; i++ {
            go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
        }
        
        for i := 0; i < numCPU; i++ {
            <-c    // wait for one task to complete
        }
        // All done.
    }

全文复盘

本文整体视角讲述了Golang信道的用法、信道的静态结构(通过这个静态结构读者可以盲猜一二);
通过一个动态示例(G和信道的交互行为)解剖了信道在阻塞前后的状态变化;

最后给出了信道的常规实践, 解读了一些常规姿势的上层思路来源。