2025年1月

.NET 响应式编程 System.Reactive 系列文章(一):基础概念

引言

在现代软件开发中,处理
异步事件

数据流
已经成为常见的需求,比如用户输入、网络请求、传感器数据等。这些数据流通常是
无限的、异步的、实时的
,而传统的编程方式往往无法优雅地处理这些情况。
响应式编程(Reactive Programming)
为我们提供了一种新的思路,帮助开发者更自然、更高效地管理数据流和异步事件。

在 .NET 中,响应式编程的核心库是
System.Reactive
,通常简称为
Rx
。本篇文章将介绍响应式编程的基础概念以及
System.Reactive
的核心组件,为后续深入学习奠定基础。


什么是响应式编程?

响应式编程(Reactive Programming)
是一种
声明式编程范式
,专注于
异步数据流

变化传播
。简单来说,它是一种处理
事件驱动

数据变化
的编程方式,可以让程序自动对外部的变化做出反应。

在响应式编程中:

  • 数据流可以是
    有界的

    无界的
    (无限的)。
  • 数据流的变化可以触发
    订阅者
    的行为。
  • 订阅者(Observer)可以随时
    订阅

    取消订阅
    这些数据流。

传统编程 vs. 响应式编程

传统编程 响应式编程
通过轮询来获取数据变化 自动响应数据流的变化
使用回调函数处理异步 通过订阅和流式操作符处理异步
不擅长处理无限数据流 专注于处理无限、异步的数据流


System.Reactive 概述

System.Reactive
是微软推出的
Reactive Extensions(Rx)
的实现,为 .NET 提供了一个强大的
观察者模式

操作符库
,让我们可以轻松地管理数据流和异步事件。

核心组件

组件 描述
IObservable<T> 表示一个数据流的
生产者
IObserver<T> 表示一个数据流的
消费者
(订阅者)
Subject<T> 既是
生产者
也是
消费者
操作符(Operators) 用于对数据流进行转换、过滤、组合等操作


观察者模式简介

System.Reactive
的核心是基于
观察者模式(Observer Pattern)
,这是一种常见的设计模式,广泛用于处理事件和回调。

观察者模式的核心接口

  1. IObservable

    (可观察对象)


    • 负责
      生产
      数据流。
    • 提供
      Subscribe
      方法,允许观察者订阅它的数据流。
  2. IObserver

    (观察者)


    • 负责
      消费
      数据流。
    • 定义了以下三个方法:
      • OnNext(T value)
        : 当有新数据时被调用。
      • OnError(Exception error)
        : 当数据流发生错误时被调用。
      • OnCompleted()
        : 当数据流结束时被调用。

简单的示例代码

using System;
using System.Reactive.Subjects;

public class Program
{
    public static void Main()
    {
        // 创建一个 Subject,它既是 IObservable 也是 IObserver
        var subject = new Subject<string>();

        // 订阅数据流
        subject.Subscribe(
            onNext: value => Console.WriteLine($"Received: {value}"),
            onError: error => Console.WriteLine($"Error: {error.Message}"),
            onCompleted: () => Console.WriteLine("Completed")
        );

        // 发布数据
        subject.OnNext("Hello");
        subject.OnNext("Reactive Extensions");
        subject.OnCompleted();
    }
}

输出结果:

Received: Hello
Received: Reactive Extensions
Completed


Observable vs. Task

许多人会将
Observable

Task
进行比较,因为它们都用于处理异步操作。但两者之间有一些显著的区别。

特性 Observable Task
数据流 多个值 / 无限值 单个值
生命周期 可被取消订阅 一次性操作
时间维度 持续的时间序列 单次完成的任务
支持的操作符 丰富的转换、过滤、组合操作符 少数操作符

简单总结:

  • Task
    更适合处理
    单次异步操作
  • Observable
    更适合处理
    连续的数据流

    多次异步事件


数据流的三个阶段

在响应式编程中,数据流有三个阶段:

  1. OnNext
    : 数据流的每一个值都会通过
    OnNext
    方法传递给订阅者。
  2. OnError
    : 如果数据流中出现错误,会通过
    OnError
    方法通知订阅者。
  3. OnCompleted
    : 当数据流结束时,会通过
    OnCompleted
    方法通知订阅者。


热数据流和冷数据流


System.Reactive
中,数据流可以分为两种类型:

1. 冷数据流(Cold Observable)

  • 冷数据流是
    被订阅时才开始产生数据
  • 每个订阅者都会从
    头开始
    接收数据。

示例:

var cold = Observable.Range(1, 5);
cold.Subscribe(x => Console.WriteLine($"Subscriber 1: {x}"));
cold.Subscribe(x => Console.WriteLine($"Subscriber 2: {x}"));

输出:

Subscriber 1: 1
Subscriber 1: 2
Subscriber 1: 3
Subscriber 1: 4
Subscriber 1: 5
Subscriber 2: 1
Subscriber 2: 2
Subscriber 2: 3
Subscriber 2: 4
Subscriber 2: 5

2. 热数据流(Hot Observable)

  • 热数据流是
    数据流开始时就产生数据
  • 每个订阅者会从
    当前数据流的位置
    开始接收数据。

示例:

var hot = new Subject<int>();
hot.OnNext(1);
hot.Subscribe(x => Console.WriteLine($"Subscriber: {x}"));
hot.OnNext(2);

输出:

Subscriber: 2


总结

在本篇文章中,我们介绍了响应式编程的基础概念以及
System.Reactive
的核心组件:

  • 响应式编程专注于处理
    异步数据流
  • System.Reactive
    提供了核心接口
    IObservable

    IObserver
  • 数据流的生命周期包含
    OnNext

    OnError

    OnCompleted
  • 区分了
    冷数据流

    热数据流

下一篇文章将介绍
System.Reactive
的基础操作符,包括创建、转换和过滤数据流的方法,敬请期待!

引入

我们在使用mybatis的时候,会在xml中编写sql语句。比如这段动态sql代码:

<update id="update" parameterType="org.format.dynamicproxy.mybatis.bean.User">
    UPDATE users
    <trim prefix="SET" prefixOverrides=",">
        <if test="name != null and name != ''">
            name = #{name}
        </if>
        <if test="age != null and age != ''">
            , age = #{age}
        </if>
        <if test="birthday != null and birthday != ''">
            , birthday = #{birthday}
        </if>
    </trim>
    where id = ${id}
</update>

mybatis底层是如何构造这段sql的?

关于动态SQL的接口和类

SqlNode接口,简单理解就是xml中的每个标签,比如上述sql的update,trim,if标签:

public interface SqlNode {
    boolean apply(DynamicContext context);
}

SqlSource Sql源接口,代表从xml文件或注解映射的sql内容,主要就是用于创建BoundSql,有实现类DynamicSqlSource(动态Sql源),StaticSqlSource(静态Sql源)等:

public interface SqlSource {
    BoundSql getBoundSql(Object parameterObject);
}

BoundSql类,封装mybatis最终产生sql的类,包括sql语句,参数,参数源数据等参数:

XNode,一个Dom API中的Node接口的扩展类:

BaseBuilder接口及其实现类(属性,方法省略了,大家有兴趣的自己看),这些Builder的作用就是用于构造sql:

下面我们简单分析下其中4个Builder:

  • XMLConfigBuilder
    :解析mybatis中configLocation属性中的全局xml文件,内部会使用XMLMapperBuilder解析各个xml文件。
  • XMLMapperBuilder
    :遍历mybatis中mapperLocations属性中的xml文件中每个节点的Builder,比如user.xml,内部会使用XMLStatementBuilder处理xml中的每个节点。
  • XMLStatementBuilder
    :解析xml文件中各个节点,比如select,insert,update,delete节点,内部会使用XMLScriptBuilder处理节点的sql部分,遍历产生的数据会丢到Configuration的mappedStatements中。
  • XMLScriptBuilder
    :解析xml中各个节点sql部分的Builder。

LanguageDriver接口及其实现类(属性,方法省略了,大家有兴趣的自己看),该接口主要的作用就是构造sql:

简单分析下XMLLanguageDriver(处理xml中的sql,RawLanguageDriver处理静态sql):XMLLanguageDriver内部会使用XMLScriptBuilder解析xml中的sql部分。

源码分析

Spring与Mybatis整合的时候需要配置SqlSessionFactoryBean,该配置会加入数据源和mybatis xml配置文件路径等信息:

<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
    <property name="dataSource" ref="dataSource"/>
    <property name="configLocation" value="classpath:mybatisConfig.xml"/>
    <property name="mapperLocations" value="classpath*:org/format/dao/*.xml"/>
</bean>

我们就分析这一段配置背后的细节:

SqlSessionFactoryBean实现了Spring的InitializingBean接口,InitializingBean接口的afterPropertiesSet方法中会调用buildSqlSessionFactory方法 该方法内部会使用XMLConfigBuilder解析属性configLocation中配置的路径,还会使用XMLMapperBuilder属性解析mapperLocations属性中的各个xml文件。部分源码如下:

由于XMLConfigBuilder内部也是使用XMLMapperBuilder,我们就看看XMLMapperBuilder的解析细节:

我们关注一下,增删改查节点的解析:

XMLStatementBuilder的解析:

默认会使用XMLLanguageDriver创建SqlSource(Configuration构造函数中设置)。

XMLLanguageDriver创建SqlSource:

XMLScriptBuilder解析sql:

得到SqlSource之后,会放到Configuration中,有了SqlSource,就能拿BoundSql了,BoundSql可以得到最终的sql。

实例分析

以下面的xml解析大概说下parseDynamicTags的解析过程:

<update id="update" parameterType="org.format.dynamicproxy.mybatis.bean.User">
    UPDATE users
    <trim prefix="SET" prefixOverrides=",">
        <if test="name != null and name != ''">
            name = #{name}
        </if>
        <if test="age != null and age != ''">
            , age = #{age}
        </if>
        <if test="birthday != null and birthday != ''">
            , birthday = #{birthday}
        </if>
    </trim>
    where id = ${id}
</update>

parseDynamicTags方法的返回值是一个List,也就是一个Sql节点集合。SqlNode本文一开始已经介绍,分析完解析过程之后会说一下各个SqlNode类型的作用。

首先根据update节点(Node)得到所有的子节点,分别是3个子节点:

  • 文本节点 \n UPDATE users
  • trim子节点 ...
  • 文本节点 \n where id = #

遍历各个子节点:

  • 如果节点类型是文本或者CDATA,构造一个TextSqlNode或StaticTextSqlNode;
  • 如果节点类型是元素,说明该update节点是个动态sql,然后会使用NodeHandler处理各个类型的子节点。这里的NodeHandler是XMLScriptBuilder的一个内部接口,其实现类包括TrimHandler、WhereHandler、SetHandler、IfHandler、ChooseHandler等。看类名也就明白了这个Handler的作用,比如我们分析的trim节点,对应的是TrimHandler;if节点,对应的是IfHandler...这里子节点trim被TrimHandler处理,TrimHandler内部也使用parseDynamicTags方法解析节点。

遇到子节点是元素的话,重复以上步骤:

trim子节点内部有7个子节点,分别是文本节点、if节点、是文本节点、if节点、是文本节点、if节点、文本节点。文本节点跟之前一样处理,if节点使用IfHandler处理。遍历步骤如上所示,下面我们看下几个Handler的实现细节。

IfHandler处理方法也是使用parseDynamicTags方法,然后加上if标签必要的属性:

private class IfHandler implements NodeHandler {
    public void handleNode(XNode nodeToHandle, List<SqlNode> targetContents) {
      List<SqlNode> contents = parseDynamicTags(nodeToHandle);
      MixedSqlNode mixedSqlNode = new MixedSqlNode(contents);
      String test = nodeToHandle.getStringAttribute("test");
      IfSqlNode ifSqlNode = new IfSqlNode(mixedSqlNode, test);
      targetContents.add(ifSqlNode);
    }
}

TrimHandler处理方法也是使用parseDynamicTags方法,然后加上trim标签必要的属性:

private class TrimHandler implements NodeHandler {
    public void handleNode(XNode nodeToHandle, List<SqlNode> targetContents) {
      List<SqlNode> contents = parseDynamicTags(nodeToHandle);
      MixedSqlNode mixedSqlNode = new MixedSqlNode(contents);
      String prefix = nodeToHandle.getStringAttribute("prefix");
      String prefixOverrides = nodeToHandle.getStringAttribute("prefixOverrides");
      String suffix = nodeToHandle.getStringAttribute("suffix");
      String suffixOverrides = nodeToHandle.getStringAttribute("suffixOverrides");
      TrimSqlNode trim = new TrimSqlNode(configuration, mixedSqlNode, prefix, prefixOverrides, suffix, suffixOverrides);
      targetContents.add(trim);
    }
}

以上update方法最终通过parseDynamicTags方法得到的SqlNode集合如下:

trim节点:

由于这个update方法是个动态节点,因此构造出了DynamicSqlSource。DynamicSqlSource内部就可以构造sql了:

DynamicSqlSource内部的SqlNode属性是一个MixedSqlNode。然后我们看看各个SqlNode实现类的apply方法。下面分析一下各个SqlNode实现类的apply方法实现:

MixedSqlNode:MixedSqlNode会遍历调用内部各个sqlNode的apply方法。

public boolean apply(DynamicContext context) {
   for (SqlNode sqlNode : contents) {
     sqlNode.apply(context);
   }
   return true;
}

StaticTextSqlNode:直接append sql文本。

public boolean apply(DynamicContext context) {
   context.appendSql(text);
   return true;
}

IfSqlNode:这里的evaluator是一个ExpressionEvaluator类型的实例,内部使用了OGNL处理表达式逻辑。

public boolean apply(DynamicContext context) {
   if (evaluator.evaluateBoolean(test, context.getBindings())) {
     contents.apply(context);
     return true;
   }
   return false;
}

TrimSqlNode:

public boolean apply(DynamicContext context) {
    FilteredDynamicContext filteredDynamicContext = new FilteredDynamicContext(context);
    boolean result = contents.apply(filteredDynamicContext);
    filteredDynamicContext.applyAll();
    return result;
}

public void applyAll() {
    sqlBuffer = new StringBuilder(sqlBuffer.toString().trim());
    String trimmedUppercaseSql = sqlBuffer.toString().toUpperCase(Locale.ENGLISH);
    if (trimmedUppercaseSql.length() > 0) {
        applyPrefix(sqlBuffer, trimmedUppercaseSql);
        applySuffix(sqlBuffer, trimmedUppercaseSql);
    }
    delegate.appendSql(sqlBuffer.toString());
}

private void applyPrefix(StringBuilder sql, String trimmedUppercaseSql) {
    if (!prefixApplied) {
        prefixApplied = true;
        if (prefixesToOverride != null) {
            for (String toRemove : prefixesToOverride) {
                if (trimmedUppercaseSql.startsWith(toRemove)) {
                    sql.delete(0, toRemove.trim().length());
                    break;
                }
            }
        }
        if (prefix != null) {
            sql.insert(0, " ");
            sql.insert(0, prefix);
        }
   }
}

TrimSqlNode的apply方法也是调用属性contents(一般都是MixedSqlNode)的apply方法,按照实例也就是7个SqlNode,都是StaticTextSqlNode和IfSqlNode。 最后会使用FilteredDynamicContext过滤掉prefix和suffix。

上周,DeepSeek-V3 将训练大模型的成本给打下来了,但训练大模型对普通开发者来说仍然门槛很高。所以,本期的热门开源项目聚焦于降低 LLM 应用开发的入门门槛。

极易上手的向量数据库 chroma 用起来十分方便,只需一行命令
pip install chromadb
就能轻松拥有一个向量数据库,用于存储和检索向量数据。接下来是专为构建实时 AI 应用的 Python ETL 框架 pathway,它提供了简单易用的 Python API 和可视化监控界面,全面提升 LLM 应用处理数据的效率。同样开箱即用的 Rust 全栈 Web 框架 Loco,则将 Rails 的开发体验与 Rust 的高性能相结合,是快速开发 Web 应用不错的选择。

最后是两个相见恨晚的开源项目,Python 项目打包神器 pex,它为 Python 项目提供了一键部署的丝滑体验。以及可以轻松部署家庭多媒体中心的 docker-xiaoya。

  • 本文目录
    • 1. 热门开源项目
      • 1.1 极易上手的向量数据库:chroma
      • 1.2 Rust 的全栈 Web 框架:Loco
      • 1.3 开箱即用的端口扫描工具:RustScan
      • 1.4 实时更新的轻量级推荐系统:monolith
      • 1.5 构建实时 AI 系统的 Python 框架:pathway
    • 2. HelloGitHub 热评
      • 2.1 相见恨晚的 Python 项目打包工具:pex
      • 2.2 一键部署完整的家庭多媒体中心:docker-xiaoya
    • 3. 结尾

1. 热门开源项目

1.1 极易上手的向量数据库:chroma

主语言:Rust

Star:16.3k

周增长:400

这是一款专为 AI 应用设计的开源向量数据库(Embedding Database),支持 Python、JavaScript、Rust 等多种编程语言。它提供了简单易用的 API 和多种启动模式(内存、文件存储、服务器),支持基于 embedding 模型的自动向量化处理,以及查询、过滤、密度估计等操作,适用于快速构建基于语义的搜索和推荐等应用。

import chromadb
client = chromadb.Client()

collection = client.create_collection("all-my-documents")
collection.add(
    documents=["This is document1", "This is document2"], # we handle tokenization, embedding, and indexing automatically. You can skip that and add your own embeddings as well
    metadatas=[{"source": "notion"}, {"source": "google-docs"}], # filter on these!
    ids=["doc1", "doc2"], # unique for each doc
)

results = collection.query(
    query_texts=["This is a query document"],
    n_results=2,
    # where={"metadata_field": "is_equal_to_this"}, # optional filter
    # where_document={"$contains":"search_string"}  # optional filter
)

GitHub 地址→
github.com/chroma-core/chroma

1.2 Rust 的全栈 Web 框架:Loco

主语言:Rust

Star:6.4k

周增长:600

该项目是受 Ruby on Rails 启发的 Rust Web 框架,专为帮助开发者快速构建 Web 应用而设计。它结合了类似 Rails 的开发体验和 Rust 的高性能优势,支持 ORM 集成、后台任务、中间件(认证、日志、错误处理)、生成部署配置等功能,适用于开发个人项目和初创企业的 Web 应用。

GitHub 地址→
github.com/loco-rs/loco

1.3 开箱即用的端口扫描工具:RustScan

主语言:Rust

Star:15k

这是一个用 Rust 开发的端口扫描工具,能够在 3 秒内扫描指定 IP 的所有端口。它提供了灵活的脚本引擎,支持 Python、Lua 和 Shell 脚本,开发者可以根据需求自定义脚本,实现个性化的扫描和处理逻辑。

GitHub 地址→
github.com/RustScan/RustScan

1.4 实时更新的轻量级推荐系统:monolith

主语言:Python

Star:6.6k

周增长:2.4k

该项目是字节跳动开源的一款轻量级推荐系统,旨在提升推荐系统的准确性和实时性。它基于 TensorFlow 构建,支持无冲突嵌入表(collisionless embedding tables)、批量和实时训练等功能,能够快速响应用户的行为变化,并及时更新模型,提升推荐效果。

GitHub 地址→
github.com/bytedance/monolith

1.5 构建实时 AI 系统的 Python 框架:pathway

主语言:Python

Star:12k

周增长:1.4k

这是一个专为流处理、实时分析、LLM 管道和 RAG 应用设计的 Python ETL 框架。它底层采用 Rust 引擎,具备高吞吐和低延迟的实时处理能力,同时提供简单易用的 Python API 和可视化监控面板,支持多种数据源、数据转换和持久化等功能。

GitHub 地址→
github.com/pathwaycom/pathway

2. HelloGitHub 热评

在此章节中,我们将为大家介绍本周 HelloGitHub 网站上的热门开源项目,我们不仅希望您能从中收获开源神器和编程知识,更渴望“听”到您的声音。欢迎您与我们分享使用这些
开源项目的亲身体验和评价
,用最真实反馈为开源项目的作者注入动力。

2.1 相见恨晚的 Python 项目打包工具:pex

主语言:Python

这是一个开源的 Python 项目打包工具,专为跨环境部署和无法访问公网的部署场景设计。它能够将 Python 项目及其所有依赖,甚至是 Python 解释器(可选),打包成单个可执行文件(.pex),让开发者无需安装运行环境,即可直接运行 Python 程序,支持 Linux 和 macOS 系统。

项目详情→
hellogithub.com/repository/5c47cbf587f448fd8c4106436b3de8e3

2.2 一键部署完整的家庭多媒体中心:docker-xiaoya

主语言:Shell

该项目提供了一键部署 Alist、Emby 和 Jellyfin 服务的解决方案,帮你轻松构建完整的家庭多媒体中心,支持 Linux、macOS、Windows 等平台。

项目详情→
hellogithub.com/repository/c0360e74337e448b852ab96ea4382a62

3. 结尾

以上就是本期「GitHub 热点速览」的全部内容,希望这些开源项目能激发你的兴趣,成为你下一个值得尝试的工具!如果你有其他好玩、有趣的 GitHub 开源项目想要分享,欢迎来
HelloGitHub
与我们交流和讨论。

往期回顾

适配器模式(Adapter Pattern)

适配器模式
是一种结构型设计模式,用于将一种接口转换为客户端期望的另一个接口,使得原本因接口不兼容而无法一起工作的类可以协同工作。适配器为中间者,连接着两个互不相容的接口,从而实现接口的适配。

核心思想
:在不改变现有代码的情况下,将一个类的接口转换为客户端期望的接口。

主要组成部分

适配器模式包含以下主要角色:

  1. 目标接口(Target)
    :定义客户端使用的接口;
  2. 需要适配的类(Adaptee)
    :一个现有的接口或类,其接口与目标接口不兼容;
  3. 适配器(Adapter)
    :一个中间层,
    连接目标接口和需要适配的类
    ,实现接口的转换;
  4. 客户端(Client)
    :通过目标接口使用适配器的功能。

一句话概括适配器组成之间的关系:

一般情况下,
适配器

目标接口

具体实现类
,实现的
具体功能
是通过调用
需要适配的类的功能
来完成。

适配器模式的两种实现方式

类适配器(基于继承)

通过
继承
需要适配的类,并实现目标接口。
(多用于单继承语言,例如Java)

对象适配器(基于组合)

通过
组合
一个需要适配的类的实例,并实现目标接口。
(推荐使用,灵活性更高)

案例实现

场景为:插头适配器

假设你有一个旧接口为
两孔插头
,现在需要
适配
为新的电源插头接口(
三孔插头
)。

目标接口--适配器--需要适配的类,这三者的关系为客户端要使用
适配类
(三孔插头),但是无法直接插入两孔插头的插座,需要先插入
适配器
,然后再将适配器插入
两孔插头
,所以他们的连接关系为:需要适配的类>>>适配器>>>目标接口。

案例类图

类适配的类图(基于继承)

image

对象适配的类图(基于组合)

image

目标接口(Target)

两孔插头

public interface TwoPinPlug {
    void connect();
}

被适配类(Adaptee)

三孔插头--接口

public interface ThreePinPlug {
    void connectWithThreePins();
}

三孔插头--具体实现类

public class ThreePinPlugImpl implements ThreePinPlug {
    public void connectWithThreePins() {
        System.out.println("连接三孔插头");
    }
}

类适配器(Adapter - 继承)

通过继承来实现的适配器

public class PlugAdapter extends ThreePinPlug implements TwoPinPlug {
    @Override
    public void connect() {
        super.connectWithThreePins(); // 转换三孔插头的接口为两孔插头接口
    }
}

对象适配器(Adapter - 组合)

通过组合的方式来实现的适配器

public class PlugAdapter2 implements TwoPinPlug {
    private ThreePinPlug threePinPlug;

    public PlugAdapter2(ThreePinPlug threePinPlug) {
        this.threePinPlug = threePinPlug;
    }

    @Override
    public void connect() {
        threePinPlug.connectWithThreePins(); // 转换三孔插头的接口为两孔插头接口
    }
}

客户端代码

public class AdapterPatternDemo {
    public static void main(String[] args) {
        // 使用类适配器
        TwoPinPlug adapter1 = new PlugAdapter();
        adapter1.connect();

        // 使用对象适配器
        ThreePinPlug threePinPlug = new ThreePinPlugImpl();
        TwoPinPlug adapter2 = new PlugAdapter2(threePinPlug);
        adapter2.connect();
    }
}

执行结果

连接三孔插头

连接三孔插头

总结

适配器模式通过引入中间层(适配器),将不兼容的接口转换为客户端需要的接口,实现系统的兼容性和灵活性。

适用场景:当现有类的接口与需求接口不兼容时;当需要将多个不同接口的类统一为同一个接口时;当不修改已有代码且需要与新需求兼容时。

image

什么是设计模式?

设计模式--原型模式及其编程思想

掌握设计模式--单例模式及其思想​

掌握设计模式--生成器模式

掌握设计模式--简单工厂模式

掌握设计模式--工厂方法模式

掌握设计模式--抽象工厂模式

掌握设计模式--装饰模式

掌握设计模式--组合模式


超实用的SpringAOP实战之日志记录

2023年下半年软考考试重磅消息

通过软考后却领取不到实体证书?

计算机算法设计与分析(第5版)

Java全栈学习路线、学习资源和面试题一条龙

软考证书=职称证书?

软考中级--软件设计师毫无保留的备考分享

  • 该模块负责管理事件的注册、调度和处理,充当事件驱动的核心引擎,驱动整个klippy系统的运行。
  • 该模块提供了一个统一的接口register_callback,使各个模块能够注册自己的回调函数以响应特定的事件。
  • 使用事件循环的方式,不断地检查事件的状态并触发相应的回调函数。

reactor模式

Reactor 模式是一个事件驱动的编程模式,它允许程序以非阻塞的方式处理多个 I/O 操作。这个模式主要包含四个核心组件:

  1. 事件循环(Event Loop)
    :它负责不断监听事件,并将其分发给相应的处理器。
  2. 反应堆(Reactor)
    :作为事件循环的管理者,它监视一组资源,等待事件发生。
  3. 资源(Resources)
    :通常是网络套接字或文件描述符,是反应堆监视的对象。
  4. 事件处理器(Event Handlers)
    :每个事件都有相应的处理器来响应。

入口

try:
    select.poll
    Reactor = PollReactor
except:
    Reactor = SelectReactor
  • 如果支持 select.poll,则调用PollReactor,否则,调用SelectReactor

初始化

class PollReactor(SelectReactor):
    def __init__(self, gc_checking=False):
        SelectReactor.__init__(self, gc_checking)
        self._poll = select.poll()
        self._fds = {}
class SelectReactor:
    NOW = _NOW
    NEVER = _NEVER
    def __init__(self, gc_checking=False):
        # Main code
        self._process = False
        self.monotonic = chelper.get_ffi()[1].get_monotonic
        # Python garbage collection
        self._check_gc = gc_checking
        self._last_gc_times = [0., 0., 0.]
        # Timers
        self._timers = []
        self._next_timer = self.NEVER
        # Callbacks
        self._pipe_fds = None
        self._async_queue = queue.Queue()
        # File descriptors
        self._fds = []
        # Greenlets
        self._g_dispatch = None
        self._greenlets = []
        self._all_greenlets = []
  • PollReactor 继承自SelectReactor,在初始化的时候,调用SelectReactor初始化,对timers、文件描述符、Greenlets等进行初始化。
def run(self):
    if self._pipe_fds is None:
        self._setup_async_callbacks()
    self._process = True
    g_next = ReactorGreenlet(run=self._dispatch_loop)
    self._all_greenlets.append(g_next)
    g_next.switch()
  • run方法 在klippy.py 主循环 打印机对象初始化后,会调用该方法。
  • self._process: 设置True 为运行状态。
  • ReactorGreenlet(run=self._dispatch_loop):生成协程,进入事件主循环协程。
  • g_next.switch():切换到协程g_next。

事件驱动主循环

def _dispatch_loop(self):
    self._g_dispatch = g_dispatch = greenlet.getcurrent()
    busy = True
    eventtime = self.monotonic()
    while self._process:
        timeout = self._check_timers(eventtime, busy)
        busy = False
        res = self._poll.poll(int(math.ceil(timeout * 1000.)))
        eventtime = self.monotonic()
        for fd, event in res:
            busy = True
            self._fds[fd](eventtime)
            if g_dispatch is not self._g_dispatch:
                self._end_greenlet(g_dispatch)
                eventtime = self.monotonic()
                break
    self._g_dispatch = None

该方法实现了基于poll函数的事件循环,通过轮询文件描述符上的事件,并根据事件类型调用相应的回调函数进行处理。在处理完一个事件后,如果需要切换到其他协程进行执行,则使用greenlet实现切换。整个过程不断循环,直到_process 为False,则结束事件循环。

  • eventtime = self.monotonic():获取当前的时间
  • timeout = self._check_timers(eventtime, busy):检查是否有任何定时器事件需要处理。该方法返回下一个定时器事件的超时时间。
  • self._poll.poll(int(math.ceil(timeout * 1000.))):轮询操作,等待文件描述符上的事件发生。
  • self._fds
    fd
    : 调用文件描述符相应的回调函数。主要监听本地socket文件和一个reactor的双向管道文件描述符。
  • if g_dispatch is not self._g_dispatch: 判断当前的greenlet与事件循环的greenlet是否相同,不同则切换协程。
  • self._end_greenlet(g_dispatch):结束当前协程,并切换到其他协程。

协程

  • greenlet 是python实现协程的一个三方库,是一种基于协作式的多任务编程模型,允许在同一线程内实现多个并发执行的任务。
  • greenlet 是协作和顺序的。当一个 greenlet 运行时,其他 greenlet 都不能运行;开发者可以完全控制何时在 greenlet 之间切换执行。
  • 协作式调度: 协程通常使用协作式调度,这意味着它们在适当的时机主动让出控制权,从而允许其他协程执行。这种方式可以有效避免线程切换的开销。
  • 事件循环: 在许多协程实现中(如 Python 的
    asyncio
    ),协程是在事件循环中运行的。事件循环负责调度和管理协程的执行。
  • I/O 密集型任务: 协程特别适合处理 I/O 密集型任务(如网络请求和文件操作),因为在等待 I/O 操作完成时,协程可以挂起自己,让其他协程继续执行。

定时器注册

def register_timer(self, callback, waketime=NEVER):

    timer_handler = ReactorTimer(callback, waketime)
    timers = list(self._timers)
    timers.append(timer_handler)
    self._timers = timers
    self._next_timer = min(self._next_timer, waketime)
    return timer_handler
  • 将一个回调函数注册到定时器队列中

定时器检查

def _check_timers(self, eventtime, busy):
    if eventtime < self._next_timer:
        if busy:
            return 0.
        if self._check_gc:
            gi = gc.get_count()
            if gi[0] >= 700:
                # Reactor looks idle and gc is due - run it
                gc_level = 0
                if gi[1] >= 10:
                    gc_level = 1
                    if gi[2] >= 10:
                        gc_level = 2
                self._last_gc_times[gc_level] = eventtime
                gc.collect(gc_level)
                return 0.
        return min(1., max(.001, self._next_timer - eventtime))
    self._next_timer = self.NEVER
    g_dispatch = self._g_dispatch
    for t in self._timers:
        waketime = t.waketime
        if eventtime >= waketime:
            t.waketime = self.NEVER
            t.waketime = waketime = t.callback(eventtime)
            if g_dispatch is not self._g_dispatch:
                self._next_timer = min(self._next_timer, waketime)
                self._end_greenlet(g_dispatch)
                return 0.
        self._next_timer = min(self._next_timer, waketime)
    return 0.
  • 该函数主要是通过检查定时器的到期时间,决定是否执行定时器的回调函数,以及更新下一次检查定时器的时间。
  • 如果未到定时器执行时间,判断状态是否忙碌,忙碌直接返回;否则进行垃圾回收操作
  • 遍历定时器列表,回调定时器函数进行处理
  • 在触发回调函数后,检查是否发生了协程切换。如果
    g_dispatch
    和当前的
    _g_dispatch
    不一致,意味着其他协程已经接管控制,结束当前协程并返回。
  • 循环结束后,更新下次要检查的定时器时间
    self._next_timer
    ,确保系统按时检查到期的定时器。

定时器更新

def update_timer(self, timer_handler, waketime):
    timer_handler.waketime = waketime
    self._next_timer = min(self._next_timer, waketime)
  • 主要是更新定时器的执行时间,是否立即执行。