.NET 响应式编程 System.Reactive 系列文章(三):Subscribe 和 IDisposable 的深入理解


引言:为什么理解 Subscribe 和 IDisposable 很重要?

在前两篇文章中,我们详细介绍了
IObservable<T>

IObserver<T>

的核心概念及交互流程。但在实际使用
System.Reactive
时,一个常见的误区是认为数据流一旦订阅,就不需要额外管理。这种认知是危险的,因为
Observable 的订阅可能是无限的
,如果不管理好订阅的生命周期,很容易导致
内存泄漏

资源浪费

在 Rx 中,
Subscribe()
方法返回一个
IDisposable
接口对象

,用于
手动取消订阅
和释放资源。另外,
System.Reactive
还提供了
不返回
IDisposable

Subscribe
重载

,这些重载方法通过
CancellationToken
管理订阅的生命周期。在本篇文章中,我们将深入探讨
Subscribe 和 IDisposable
的原理、这些特殊重载的设计原因,以及在实际使用中的应用场景。


1. Subscribe 的内部机制

1.1 Subscribe 的作用

Subscribe
是连接
IObservable<T>

IObserver<T>
的桥梁。当你调用
Subscribe()
方法时:

  • IObservable<T>
    开始向
    IObserver<T>
    推送数据

  • 订阅会保持活跃状态,直到:
    • 数据流结束(调用
      OnCompleted()
      )。
    • 发生错误(调用
      OnError()
      )。
    • 手动取消订阅(调用
      Dispose()
      )。
    • 超时取消订阅(向CancellationToken注册超时回调)。

1.2 为什么 Subscribe 返回 IDisposable?

普通的
Subscribe
重载

返回一个
IDisposable
对象,允许你通过调用
Dispose()
方法取消订阅。这是管理数据流生命周期的核心机制之一。


2. Subscribe 重载:不返回 IDisposable 的特殊情况

System.Reactive
提供了一些特殊的
Subscribe
重载方法

,它们不返回
IDisposable
,而是依赖于
CancellationToken
来控制订阅的生命周期。这些方法设计的目的是为了提供一种
外部取消订阅的机制
,让你无需手动管理
Dispose()
的调用。

2.1 方法签名

以下是其中一个不返回
IDisposable

Subscribe
重载:

public static void Subscribe<T>(
    this IObservable<T> source,
    Action<T> onNext,
    Action<Exception> onError,
    Action onCompleted,
    CancellationToken cancellationToken
);

这种重载方法的使用场景是:
你希望通过
CancellationToken
来控制订阅的生命周期

,而不是手动调用
Dispose()


2.2 示例代码:使用 CancellationToken 管理订阅

示例:超时取消订阅

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
	static void Main(string[] args)
	{
		IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

		CancellationTokenSource cts = new();

		// 使用 Subscribe 方法并传入 CancellationToken
		observable.Subscribe(
			onNext: static value => Console.WriteLine($"Received: {value}"),
			onError: static ex => Console.WriteLine($"Error: {ex.Message}"),
			onCompleted: static () => Console.WriteLine("Completed"),
			token: cts.Token
		);

		// 模拟运行 5 秒后取消订阅
		Console.WriteLine("Running for 5 seconds...");
		Thread.Sleep(5000);
		cts.Cancel();
		Console.WriteLine("Subscription cancelled.");
	}
}

输出结果:

Running for 5 seconds...
Received: 0
Received: 1
Received: 2
Received: 3
Subscription cancelled.


2.3 使用场景:什么时候使用 CancellationToken?

使用场景 推荐的 Subscribe 重载
需要手动取消订阅 返回
IDisposable
的重载
使用外部控制(如用户交互、超时)控制订阅
CancellationToken
的重载

典型场景:

  1. 异步任务取消
    在异步任务中使用
    CancellationToken
    取消订阅数据流,避免阻塞或内存泄漏。

  2. 超时控制
    使用
    CancellationTokenSource.CancelAfter()
    设置超时取消订阅。


2.4 示例:设置超时取消订阅

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
	static void Main(string[] args)
	{
		IObservable<long> observable = Observable.Interval(TimeSpan.FromSeconds(1));

		CancellationTokenSource cts = new();
		cts.CancelAfter(TimeSpan.FromSeconds(3)); // 设置 3 秒后自动取消订阅

		observable.Subscribe(
			onNext: static value => Console.WriteLine($"Received: {value}"),
			onError: static ex => Console.WriteLine($"Error: {ex.Message}"),
			onCompleted: static () => Console.WriteLine("Completed"),
			token: cts.Token
		);

		Console.WriteLine("Running...");
		Thread.Sleep(5000);
		Console.WriteLine("Program ended.");
	}
}

输出结果:

Running...
Received: 0
Received: 1
Received: 2
Program ended.


3. 使用场景总结

使用方式 特点 适用场景
Subscribe
返回
IDisposable
允许手动取消订阅 长时间订阅或频繁管理多个订阅
Subscribe
接受
CancellationToken
通过外部控制(如超时或用户交互)取消订阅 异步任务、超时控制、用户交互场景


4. 注意事项:CancellationToken 的局限性

虽然使用
CancellationToken
可以简化订阅管理,但也有一些需要注意的地方:

  1. 不支持手动取消
    如果你使用的是返回
    IDisposable

    Subscribe
    方法,你可以手动调用
    Dispose()
    取消订阅。但如果你使用带
    CancellationToken
    的重载,就无法通过
    Dispose()
    取消订阅。

  2. 更适合一次性订阅

    CancellationToken

    Subscribe
    重载更适合
    一次性订阅
    的场景。如果你需要频繁管理多个订阅,使用
    CompositeDisposable
    或手动管理
    IDisposable
    可能更合适。


5. 两种订阅方式的对比

特性 返回
IDisposable

Subscribe

CancellationToken

Subscribe
是否支持手动取消订阅 ✅ 支持 ❌ 不支持
是否支持外部控制订阅生命周期 ❌ 需要手动调用
Dispose()
✅ 通过
CancellationToken
控制
是否适合长期订阅 ✅ 适合 ❌ 更适合一次性订阅


6. Subscribe 和 IDisposable 的交互流程图

sequenceDiagram
participant Observer as IObserver<T>
participant Observable as IObservable<T>
participant IDisposable as IDisposable

Observer ->> Observable: Subscribe()
Observable ->> Observer: OnNext(T value)
Observable ->> Observer: OnNext(T value)
Observer ->> IDisposable: Dispose()
Observable -->> Observer: 停止推送数据


总结

在本篇文章中,我们详细探讨了
Subscribe 和 IDisposable
的内部机制,并重点介绍了

CancellationToken

Subscribe
重载

  1. Subscribe()
    方法返回
    IDisposable

    ,用于管理订阅的生命周期。
  2. 不返回
    IDisposable

    Subscribe
    重载

    ,通过
    CancellationToken
    控制订阅的终止。
  3. 使用场景不同

    IDisposable
    更适合长期订阅,
    CancellationToken
    更适合一次性或外部控制的订阅。


下一篇文章预告

《.NET 响应式编程 System.Reactive 系列文章(四):操作符基础》
下一篇文章将介绍
System.Reactive
的基础操作符,包括如何
创建

转换

过滤
数据流。我们将通过实战示例,帮助你快速掌握 Rx 的操作符使用方法。敬请期待!

标签: none

添加新评论