2024年7月

X-Stream论文

《X-Stream: Edge-centric Graph Processing using Streaming Partitions》

前面通过文章
《论文图谱当如是:Awesome-Graphs用200篇图系统论文打个样》
向大家介绍了论文图谱项目Awesome-Graphs,并分享了Google的
Pregel
以及OSDI 2012上的
PowerGraph
。这次向大家分享发表在SOSP 2013上的另一篇经典图计算框架论文X-Stream,构建了单机上基于外存的Scatter-Gather图处理框架。

对图计算技术感兴趣的同学可以多做了解,也非常欢迎大家关注和参与论文图谱的开源项目:

提前感谢给项目点Star的小伙伴,接下来我们直接进入正文!

摘要

  • X-Stream是一个单机共享内存的既可以处理内存图也可以处理外存图的图处理系统。
  • 特点:
    • 以边为中心的计算模型。
    • 流式访问无序边,而不是随机访问。

1. 介绍

传统的以点为中心的处理:

  • scatter函数将点状态传播给邻居点。
  • gather函数累计更新,并重新计算点状态。


顺序/随机访问不同存储介质的性能差异:

  • 磁盘:500x
  • SSD:30x
  • 内存:1.8x - 4.6x

X-Stream的以边为中心的处理:

  • scatter/gather在边/更新上迭代,而不是在点上迭代。
  • 使用流式分区缓解点集的随机访问。
  • 将边和源点划分到同一个分区。


X-Stream主要贡献:

  • 边中心处理模型。
  • 流式分区。
  • 不同存储介质上的良好扩展性。
  • 高性能。

2. X-Stream处理模型

API设计:

  • Scatter:根据边和源点,计算目标点更新。
  • Gather:根据目标点收到更新,重新计算目标点状态。

2.1 流

X-Stream使用流的方式执行Scatter+Gather。边和更新是顺序访问的,但是点是随机访问的。

2.2 流式分区

流式分区包含:

  • 点集:分区上的点子集。
  • 边列表:源点的边。
  • 更新列表:目标点的更新。

2.3 分区上的Scatter-Gather

Scatter + Shuffle + Gather:

2.4 分区的大小和数量

  • 一方面为了让点集合尽量加载到快存储,分区数不能太小。
  • 另一方面为了最大化利用慢存储的顺序读写能力,分区数不能太大。
  • 通过固定分区点集合大小的方式进行分区。

2.5 API限制和扩展

  • 虽然不能遍历点上的所有边,但是可以对所有的点进行迭代,并提供自定义的点函数。
  • 不仅限于支持scatter-gather模型,也可以支持semi-streaming、W-Stream模型等。

3. 基于外存的流式引擎

每个流式分区维护三个磁盘文件:点文件、边文件、更新文件。
难点在于实现shuffle节点的顺序访问,通过合并scatter+shuffle阶段,更新写入到内存buffer,buffer满时执行内存shuffle追加到目标分区磁盘文件。

3.1 内存数据结构

stream buffer设计:

基于stream buffer,一个buffer用于存储scatter的更新,另一个存储内存shuffle的结果。

3.2 操作

初始化边分区可以使用内存shuffle方式实现。

3.3 磁盘IO

  • X-Stream的stream buffer采用异步Direct I/O,而不是OS页面缓存(4K)。
  • 预读和块写提高磁盘利用率,但是需要额外的stream buffer。
  • 使用RAID实现读写分离。
  • 使用SSD存储TRIM操作实现truncate。

3.4 分区数量

假设分区的更新满足均匀分布,则有如下内存公式:

  • N:点集合内存总量。
  • S:最大带宽IO请求包大小。
  • K:分区数。
  • M:内存总量。

4. 基于内存的流式引擎

4.1 并行Scatter-Gather

  • 每个线程写自由缓存,再统一flush到贡献的输出数据块。
  • 通过worker stealing避免倾斜。

4.2 并行多阶段shuffle

  • 将分区使用树形结构组织起来,分支因子F(扇出度大小),树的每一层对应一步shuffle。
  • 因此对于K个分区,一共需要logFK步shuffle。
  • 使用两个stream buffer轮换输入输出角色实现shuffle。
  • 论文将F设置为CPU cache的可用行数。

4.3 磁盘流的分层

内存引擎逻辑上在外存引擎上层,外存引擎可以自由选择使用内存引擎处理的分区数量,以最大化利用内存和计算资源。

5. 评估

  • 256M内存cache大小,在16core时达到最大内存带宽25GB/s。
  • 16M IO请求包大小。

前言

这是一篇关于 CAP 全部特性的一览文章,里面也包含了官方文档中未提及的一些特性,来看看是否有你不知道的特性,同时也是对于新用户在选型时是一个很好的参考。

CAP现在已经不仅仅是一个分布式事务的解决方案,其在消息处理方面提供的大量特性可以在面临复杂的业务场景时也游刃有余,并仍然保持轻量级、高性能、易使用的特点。

在数据兼容性方面也做了大量努力,5年前发布的 3.0 版本在不经过任何改动的情况下也可直升最新版本。同时过去7年的不断改进与完善,在稳定性方面也获得了用户的大量好评。

项目简介:

  • CAP
    是一个处理分布式事务问题的开源解决方案,该项目诞生于2016年,目前在 Github 已经有超过 6500+ Star 和 110+ 贡献者,以及在 NuGet超 800 万的下载量。

消息发布

携带消息头

消息头可以用来传递与消息相关的元数据信息,CAP支持在消息中携带自定义头部信息,这些头部信息可以在消费者端进行读取和处理。在发布消息时添加头部信息:

var headers = new Dictionary<string, string>
{
   { "my.header.first", "first" },
   { "my.header.second", "second" }
};
await capBus.PublishAsync("test.show.time", DateTime.Now, headers);

设置消息前缀

使用前缀可以有效管理和组织消息,使得不同类型或不同应用的Topic具有一定的层次结构,CAP允许为消息设置特定的前缀,便于分类和识别。可以在配置中设置:

services.AddCap(x =>
{
   x.TopicNamePrefix = "prefix";
});

原生支持的延迟消息

延迟消息可以用于实现定时任务调度执行,也可以通过发送延迟消息来实现重试等机制,同时也在缓存失效、订单超时处理、流量削峰、异步工作流等场景有多种应用。CAP原生支持延迟消息而不需要借助消息队列的能力,例如,延迟100秒发布消息:

await capBus.PublishDelay(TimeSpan.FromSeconds(100), "test.show.time", DateTime.Now);

并行发布消息

如果你在消息发送的时候追求高性能而不是特别在意消息顺序,CAP支持开启并行发布消息配置,在内部CAP会按照批次来处理数据,这会显著提高发布速度。

services.AddCap(x =>
{
   x.EnablePublishParallelSend = true;
});

事务消息

事务消息是CAP的第一大特性,在构建 SOA 或者 微服务系统的过程中,通常需要使用事件来对各个服务进行集成,在这过程中简单的使用消息队列并不能保证数据的最终一致性, CAP 采用的是和当前数据库集成的本地消息表的方案来解决在分布式系统互相调用的各个环节可能出现的异常,它能够保证任何情况下事件消息都是不会丢失的。

背景知识:
https://www.cnblogs.com/savorboard/p/distributed-system-transaction-consistency.html

事务消息发送

发送事务消息需要同时确保业务消息和事件消息包含着一个数据库事务上下文中,这样才能保证数据一致性。以下是和本地事务集成的示例:

using (var connection = new MySqlConnection(ConnectionString))
{
	using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
	{
		connection.Execute("insert into test(name) values('test’)",
			 transaction: (IDbTransaction)transaction.DbTransaction);
		await cap.PublishAsync("order.insert", DateTime.Now);
	}
}

如果你使用的第三方ORM,参考下面的链接。

FreeSQL集成示例:
https://github.com/dotnetcore/FreeSql/discussions/1202
SqlSugar集成示例:
https://github.com/DotNetNext/SqlSugar/issues/1207
Chloe集成示例:
https://github.com/shuxinqin/Chloe/issues/328

事务消息消费

消费端事务其实没有多大必要,因为消息已经不会丢了,但是如果你想开启,也可以。消费端自动事务主要利用 CAP 提供的过滤器来进行开启。

1、创建过滤器

public class MyCapFilter : SubscribeFilter {
    private readonly AppDbContext _dbContext;
    private IDbContextTransaction _transaction;

    public MyCapFilter(AppDbContext dbContext){
        _dbContext = dbContext;
    }
    public override void OnSubscribeExecuting(ExecutingContext context){
        _transaction = _dbContext.Database.BeginTransaction();
    }
    public override void OnSubscribeExecuted(ExecutedContext context){
        _transaction.Commit();
    }
    public override void OnSubscribeException(DotNetCore.CAP.Filter.ExceptionContext context){
       _transaction.Rollback();
    }
}

2、配置过滤器

services.AddCap(opt =>
{
    // ***
}.AddSubscribeFilter<MyCapFilter>();

事务补偿

在进行多服务的业务事务操作时,可能会遇到下游执行失败的情况,CAP支持提供操作失败后的补偿机制,可以通过回调机制实现。例如:

await cap.PublishAsync("place.order", ...,"place.order.qty.deducted");

[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JsonElement param)
{
   var orderId = param.GetProperty("OrderId").GetInt32();
   // 业务逻辑
   return new { OrderId = orderId, IsSuccess = true };
}

消息处理

序列化

默认情况下CAP使用 Json 来序列化消息,Json现在基本已成为传输的标准格式,这这在大部分情况下工作的很好。如果你对性能、兼容性、安全性等有其他考虑,可以通过
ISerializer
接口来自定义实现支持诸如 XML、GRPC、MessagePack 等格式。

序列化 Json 默认使用 .NET
System.Text.Json
实现,如果你想更换为
Newtonsoft.Json
查看
这里

services.AddSingleton<ISerializer, YourSerializer>();

过滤器

CAP 过滤器和 ASP.NET MVC 中的过滤器类似,可以在消费者被执行前、执行后、执行异常时进行拦截并处理。

public class MyCapFilter: SubscribeFilter {
    public override Task OnSubscribeExecutingAsync(ExecutingContext context){
        // 订阅方法执行前
    }
    public override Task OnSubscribeExecutedAsync(ExecutedContext context){
        // 订阅方法执行后
    }
    public override Task OnSubscribeExceptionAsync(ExceptionContext context){
        // 订阅方法执行异常
    }
}

services.AddCap(opt =>
{
    // ***
}.AddSubscribeFilter<MyCapFilter>();

消息重试

当消息发送或执行失败时,CAP支持重试机制。可以配置重试间隔和重试次数:

services.AddCap(x =>
{
   x.FailedRetryCount = 5;
   x.FailedRetryInterval = 60;
});

重试回退

重试器会定期扫描失败的消息并重新尝试,对于失败的消息默认查询当前时间回退 4 分钟前的,这是为了保证新消息不会被重试器拾取。
你可以通过
FallbackWindowLookbackSeconds
配置项来自定义配置此值。

多线程处理

多消费线程支持
多执行线程支持

自动恢复/重连

对于消息队列的自动重连功能在分布式系统和消息驱动的架构中具有重要作用,特别是在提高系统的可用性、可靠性和容错能力方面。CAP原生实现了对于连接的健康检查及自动重连功能而不是借助于客户端的实现,从而可以最大程度的保证连接的稳定性,提高系统的容错性,使得消费者能够在遇到问题时自动恢复,而不需要重启系统或应用。

这是一项内置功能,不需要任何配置。

分布式存储锁

重试线程默认每隔1分钟来读取存储的消息用于对发送或消费失败的消息进行重试,单个实例没有什么问题,在启用多个实例的场景下会有一定几率出现并发读的情况,CAP支持配置基于数据库的分布式锁,这样可以避免多实例并发读的问题,并且对异常场景的处理也进行了考虑。

services.AddCap(x =>
{
   x.UseStorageLock = true;
});

消息版本隔离

版本隔离就是利用版本特性将消息对象按照版本划分,从而来隔离不同版本的业务或实例。CAP支持版本隔离特性,可以用于多个业务场景,例如业务快速迭代,需要向前兼容、多个版本的服务端接口、不同实例使用相同的表、开发阶段不同开发人员的消息区分等。可以通过配置版本号实现:

services.AddCap(x =>
{
   x.Version = "v1";
});

优化的雪花算法

雪花算法用于生成消息唯一标识,CAP使用的雪花算法是优化过的,解决了时钟漂移问题以及默认使用了Mac低10位作为WorkerId保证不会重复。 同时也支持自定义雪花算法实现,如下:

services.AddSingleton<ISnowflakeId, SnowflakeId>();

消息自动清理

CAP 有自动清理机制来确保数据库表的数据量不会无限增长。默认情况下,对于成功处理的消息会在 1 天后过期然后清理,对于处理失败的消息会在 15 天后过期然后清理。你可以通过
SucceedMessageExpiredAfter

FailedMessageExpiredAfter
来自定义他们的过期时间。

清理器被 5 分钟扫描一次过期的消息,你可以通过
CollectorCleaningInterval
配置项来自定义间隔时间。

消费者特性

Attribute 订阅

基于 Attribute 的订阅可以减少重复代码,自动完成消息订阅和处理的绑定,这种方式可以让开发者专注于业务逻辑,消息传递的细节,提高代码可读性和维护性。

使用Attribute标记的方式进行消息订阅和消费:

[CapSubscribe("test.show.time")]
public void Show(DateTime datetime)
{
   Console.WriteLine(datetime);
}

多Attribute 订阅

CAP允许在同一方法上使用多个Attribute,可以让方法同时订阅多个不同的消息主题,从而增加了系统的灵活性和可扩展性及代码重用性。

[CapSubscribe("test.show.time")]
[CapSubscribe("test.show.hello")]
public void Show(DateTime datetime)
{
   Console.WriteLine(datetime);
}

通配符订阅

支持通过通配符进行消息订阅,尤其是在处理大量主题或复杂的消息路由需求时,可以大大简化订阅管理。

你可以使用
*

#
通配符来进行批量订阅。例如订阅所有以"test."开头的消息:

  • 可以代替一个或多个词, # 可以代替零或多个词
[CapSubscribe("test.*")]
public void CheckReceivedMessage(DateTime datetime)
{
   Console.WriteLine(datetime);
}

异步订阅

支持异步方法订阅消息,可传递 CancellationToken 支持取消,这可以和启动停止Api配合实现灵活的并发处理及充分的资源利用。

[CapSubscribe("test.*")]
public async Task CheckReceivedMessage(DateTime datetime, CancellationToken cancellationToken)
{
   Console.WriteLine(datetime);
}

多程序集订阅

在单个系统进行模块化设计时,一般遵循分离关注点,订阅者可能分布在不同的程序集,CAP 支持跨多个程序集进行消息订阅,只需在不同程序集的类上定义订阅方法,然后使用
AddSubscriberAssembly
配置即可,然后CAP会自动查找发现注册。

services.AddCap(x =>
{
}).AddSubscriberAssembly(new Assembly[]{});

前缀订阅

订阅前缀用于设置在订阅时在父级类上指定一个前缀,那么类中的方法便自动继承此前缀,一般用于对消费者进行归类管理。

[CapSubscribe("customers")]
public class CustomersCapController : ICapSubscibe
{
	[CapSubscribe(".create", isPartial: true)]
	public Task HandleCreationAsync(Customer customer)	{
	}

	[CapSubscribe(".update", isPartial: true)]
	public Task HandleUpdateAsync(Customer customer)	{
	}
}

消费组

消费者组是由 Kafka 提出的一个非常重要的概念,用于实现高效的消息消费管理和负载均衡。CAP同样支持消费组的概念,而且针对了不同的消息队列进行适配,从而可以使用户获得一致性的使用体验。

[CapSubscribe("test.show.time", Group = "group1")]
public void CheckReceivedMessage(DateTime datetime)
{
   Console.WriteLine(datetime);
}

扇出(Fanout)消费

借助于抽象出的消费者组的概念,CAP支持扇出消费消息,也就是一发多收。

如果多个相同消费者组的消费者消费同一个Topic消息的时候,只会有一个消费者被执行。 相反,如果消费者都位于不同的消费者组,则所有的消费者都会被执行。
下面的实例中,
Hello1

Hello2
位于不同的 Group 中,那么都会收到消息,从而可以实现一发多收。

 [CapSubscribe("hello", Group = "bar")]
   public void Hello1(){
     Console.WriteLine("hello 1");
   }
   
 [CapSubscribe("hello", Group = "foo")]
   public void Hello2(){
     Console.WriteLine("hello 2");
   }

隐式类型消费

消费者参数接收支持隐式的消息类型转换,例如你发送的是一个 string 类型的日期字符串,那么在消费者端你也可以使用DateTime来进行接收。

await cap.PublishAsync<string>("hello", "2020/10/10 11:11");

[CapSubscribe("hello")]
public void Hello(DateTime time){
 Console.WriteLine($"hello {time}");
}

无类型消费

如果你不知道发布的消息类型或者不想再去定义类型,CAP支持使用
System.Text.Json.JsonElement
接收一切类型。

await _capBus.PublishAsync("hello", DateTime.Now);
await _capBus.PublishAsync("hello", 1345);
await _capBus.PublishAsync("hello", true);
await _capBus.PublishAsync("hello", "hello");
await _capBus.PublishAsync("hello", new Person{Name = "Jack", Age = 22});

[CapSubscribe("hello")]
public void Hello(JsonElement obj){
	Console.WriteLine(obj.ToString());
}

接口化类型消费

CAP支持通过扩展来实现基于接口的类型化消息消费,请查看 Sample.RabbitMQ.SqlServer 中的示例。

串行消费

大部分场景中希望发送和处理消息的顺序是一致的,在CAP中消息默认按顺序串行处理。

并行消费

在一些业务场景中消息顺序可能不是非常重要,但需要快速处理完消息,通过下面的配置项来开启并行处理。

支持启用多个订阅并行执行,通过
EnableSubscriberParallelExecute
来开启。

当开启后,支持通过
SubscriberParallelExecuteThreadCount
配置项设置线程数。

在内部CAP实现原理是先将消费线程收到的消息放置到缓冲区然后决定串行或者并行执行,因此还提供了另外一个配置参数
SubscriberParallelExecuteBufferFactor
来设置缓冲区的大小,这是一个乘数因子,缓冲区的大小值为 (
SubscriberParallelExecuteThreadCount
*
SubscriberParallelExecuteBufferFactor
)。

背压机制

消息发送或消费的吞吐量除和用户的代码有关系外,还和当前的硬件负载及处理能力有关,缓冲区的设计在一定程度上能够最大化消息处理的吞吐能力及资源利用能力,但这并非万能的解药。当缓冲区满时CAP将自动启用背压机制来降低响应能力,这可以确保内存始终是安全的而不会引发OOM。

简单举例来说,有一个池子,一边向池子注水一边从池子放水,当注水的速度大于放水的速度时,那么池子满了后就会慢慢溢出。背压机制相当于给注水口添加了一个控制阀门,当池子满后将控制阀门流量来保持注水和放水平衡。

独立消费

有一些情况下某些消费者需要执行非常长的时间,这样会卡住另外一些执行时间比较短消费者,导致后续的消息不能及时消费。
所以在一些业务场景下需要某些 long-running 的消费者具有独立的数据管道以和其他正常的消费者隔离开,做到独立执行。

在 CAP 中可以在订阅时通过在 Attribute 添加
GroupConcurrent
参数来做到这一点,这是一个并行参数意味着也支持多个线程执行。

[CapSubscribe("hello", GroupConcurrent = 2)]
public void Hello2(){
  Console.WriteLine("hello 2");
  Thread.Sleep(1000);
}

在 Hello2中设置为 2 那么就代表这个方法在有新消息到达时候,最多会有2个线程在同时执行它

控制与集成特性

启动/停止 API

默认情况下 CAP 随 ASP.NET Core 宿主进程启动, 如果你想手动控制启动或停止的时刻,你可以通过
IBootstrapper
接口来动态控制,在这一些需要特定时间来处理消息的场景中很方便。

提供启动和停止消息处理的API,便于控制消息处理的生命周期。

[Route("~/control/start")]
public async Task<IActionResult> Start([FromServices]IBootstrapper bootstrapper){
    await bootstrapper.BootstrapAsync();
    return Ok();
}

[Route("~/control/stop")]
public async Task<IActionResult> Stop([FromServices] IBootstrapper bootstrapper){
    await bootstrapper.DisposeAsync();
    return Ok();
}

异构系统集成

发送的消息可能来自于第三方异构系统而不是由CAP的发送端产生,CAP支持与异构系统进行消息对接。

如果异构系统发送的消息没有传递消费必须的Header信息,可以使用
CustomHeadersBuilder
配置项进行添加。

x.UseRabbitMQ(z => {
	z.CustomHeadersBuilder = (msg, sp) =>
	[
		new(Headers.MessageId, sp.GetRequiredService<ISnowflakeId>().NextId().ToString()),
		new(Headers.MessageName, msg.RoutingKey)
	];
}); 

多种传输支持

支持多种支持消息队列作为传输(Transport),以下是目前支持的传输。

  • RabbitMQ
  • Kafka
  • Azure Service Bus
  • Amazon SQS
  • NATS
  • In-Memory Queue
  • Redis Streams
  • Apache Pulsar

除此之外,下面是社区支持的传输,感谢贡献者。

多种存储支持

支持多种流行的消息存储方式,包括 SQL Server、MySQL、PostgreSQL、MongoDB、InMemory 等。

除了 InMemory 外,均提供了对事务消息的支持。

除此之外,下面是社区支持的传输,感谢贡献者。

监控,诊断,可观察性

CAP提供了 Dashboard 供用户快速的消息查看及实时消息发送及消费情况。
CAP提供了 Diagnostics API 供用户方便的使用 Logging、Tracing 及 Metrics.
CAP提供了 OpenTelemetry 可观察性标准的支持。

Dashboard 包含以下特性:

  • 多语言
  • 实时监控和可视化
  • 手动查看与重试
  • 与 ASP.NET Core紧密集成的授权
  • 服务代理
  • 服务发现之Consul
  • 服务发现之Kubernetes
  • Metrics度量支持

多语言

提供中文和英文语言支持,在界面右上角进行切换显示。

实时监控和可视化

支持实时显示消息处理状态的图表,帮助监控消息生产及消费执行情况。

手动查看与重试

基于Dashboard提供的功能,可以查看消息的详细信息包括 header 和 content 内容。

在界面上可以对成功或失败的消息在手动再次进行发送或消费。

Subscriber 页面提供了的当前服务下所有订阅方法的查看,基于 Group 进行分组显示,便于用户进行统一查看于搜索。

与 ASP.NET Core 授权紧密集成

Dashboard 支持与 ASP.NET Core的授权机制无缝衔接。

  1. 匿名访问
cap.UseDashboard(d =>{
	d.AllowAnonymousExplicit = true;
}); 
  1. OpenId Connect
services.AddAuthorization(options => { 
	options.AddPolicy(DashboardAuthorizationPolicy, policy => policy
		.AddAuthenticationSchemes(OpenIdConnectDefaults.AuthenticationScheme)
		.RequireAuthenticatedUser());
})
.AddAuthentication(opt => opt.DefaultScheme = CookieAuthenticationDefaults.AuthenticationScheme)
.AddCookie()
.AddOpenIdConnect(options => {
	...
});

services.AddCap(cap => {
	cap.UseDashboard(d => {
		d.AuthorizationPolicy = DashboardAuthorizationPolicy;
	});
});

请查看 Sample.Dashboard.Auth 示例项目了解更多。

Consul 节点发现

CAP是去中心化的设计,因此要查看其他节点的数据而不打开其他节点的Dashboard的情况下就要支持节点发现,Dashboard内置了代理功能用于代理其他节点API返回在数据在当前节点的Dashboard显示。
通过简单的节点切换即可来查看数据,从而可以带来更好的用户体验。

支持通过Consul进行节点发现,在过去大部分公司使用它来作为分布式系统的配置中心或注册中心。

更多信息,请查看
官方文档

K8s 节点发现

现代的应用程序基本是K8S作为默认集群管理及服务注册中心,
DotNetCore.CAP.Dashboard.K8s
包提供了对 K8s 节点发现的支持。

CAP可读取K8S特定命名空间下的所有服务,然后 ping 出启用了Dashboard API 的那些节点转而代理切换过去。

独立运行的Dashboard

Dashboard 也支持不依赖服务项目而独立部署,此时无需再配置
AddCap
而直接使用下面的方式:

services.AddCapDashboardStandalone();

更多信息,请查看
官方文档

Diagnostics API

CAP 对 .NET
DiagnosticSource
提供了支持,监听器名称为
CapDiagnosticListener

你可以在
DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames
类下面找到CAP已经定义的事件名称。

Diagnostics 提供对外提供的事件信息有:

Before After Exception
消息持久化之前 消息持久化之后 消息持久化异常
消息向MQ发送之前 消息向MQ发送之后 消息向MQ发送异常
消息从MQ消费保存之前 消息从MQ消费保存之后
订阅者方法执行之前 订阅者方法执行之后 订阅者方法执行异常

CAP 对 .NET
EventSource
提供了支持,计数器名称为
DotNetCore.CAP.EventCounter

Metrics 提供了以下几个指标:

  • 每秒发布速度
  • 每秒消费速度
  • 每秒调用订阅者速度
  • 每秒执行订阅者平均耗时

OpenTelemetry 支持

分布式追踪在现代应用中有着非常重要的作用,OpenTelemetry 是一个开放的标准支持多种编程语言和框架,提供了跨平台、跨语言的兼容性,便于在异构系统中实现统一的监控和追踪。

CAP可以衔接上游触发的消息源链路,然后经由每个服务,每个传输,每个存储后传递到下游执行,整个过程都会被记录并还原,形成完整的调用链,便于性能优化和问题排查。除此之外,在消息消费时再次发送的消息链路同样可以被准确衔接,做到调用链的完整准确。

CAP 通过
DotNetCore.CAP.OpenTelemetry
包提供了对 OpenTelemetry 的支持。

services.AddOpenTelemetryTracing((builder) => builder
    .AddAspNetCoreInstrumentation()
    .AddCapInstrumentation()    
    .AddZipkinExporter()
);

总结

以上,就是 CAP 全部特性的介绍,如果你还有一些更好的想法和主意可以在 Github 给我们提交 Issue。

如果你在使用的过程中遇到问题希望也能够积极的反馈,帮助CAP变得更好。

如果你喜欢这个项目,可以通过下面的连接点击 Star 给我们支持。

GitHub stars

如果你觉得本篇文章对您有帮助的话,感谢您的【推荐】。


本文地址:
http://www.cnblogs.com/savorboard/p/cap-features.html
作者博客:
Savorboard
本文原创授权为:署名 - 非商业性使用 - 禁止演绎,协议
普通文本
| 协议
法律文本

图像采集源除了显示控件(上一篇《
.NET 控件转图片
》有介绍从界面控件转图片),更多的是窗口以及屏幕。

窗口截图最常用的方法是GDI,直接上Demo吧:

1         private void GdiCaptureButton_OnClick(objectsender, RoutedEventArgs e)2 {3             var bitmap =CaptureScreen();4             CaptureImage.Source =ConvertBitmapToBitmapSource(bitmap);5 }6         /// <summary>
7         ///截图屏幕8         /// </summary>
9         /// <returns></returns>
10         public staticBitmap CaptureScreen()11 {12             IntPtr desktopWindow =GetDesktopWindow();13             //获取窗口位置大小
14             GetWindowRect(desktopWindow, out varlpRect);15             returnCaptureByGdi(desktopWindow, 0d, 0d, lpRect.Width, lpRect.Height);16 }17         privateBitmapSource ConvertBitmapToBitmapSource(Bitmap bitmap)18 {19             using MemoryStream memoryStream = newMemoryStream();20             //将 System.Drawing.Bitmap 保存到内存流中
21 bitmap.Save(memoryStream, System.Drawing.Imaging.ImageFormat.Png);22             //重置内存流的指针到开头
23             memoryStream.Seek(0, SeekOrigin.Begin);24 
25             //创建 BitmapImage 对象并从内存流中加载图像
26             BitmapImage bitmapImage = newBitmapImage();27 bitmapImage.BeginInit();28             bitmapImage.StreamSource =memoryStream;29             bitmapImage.CacheOption =BitmapCacheOption.OnLoad;30 bitmapImage.EndInit();31             //确保内存流不会被回收
32 bitmapImage.Freeze();33             returnbitmapImage;34 }35         /// <summary>
36         ///截图窗口/屏幕37         /// </summary>
38         /// <param name="windowIntPtr">窗口句柄(窗口或者桌面)</param>
39         /// <param name="left">水平坐标</param>
40         /// <param name="top">竖直坐标</param>
41         /// <param name="width">宽度</param>
42         /// <param name="height">高度</param>
43         /// <returns></returns>
44         private static Bitmap CaptureByGdi(IntPtr windowIntPtr, double left, double top, double width, doubleheight)45 {46             IntPtr windowDc =GetWindowDC(windowIntPtr);47             IntPtr compatibleDc =CreateCompatibleDC(windowDc);48             IntPtr compatibleBitmap = CreateCompatibleBitmap(windowDc, (int)width, (int)height);49             IntPtr bitmapObj =SelectObject(compatibleDc, compatibleBitmap);50             BitBlt(compatibleDc, 0, 0, (int)width, (int)height, windowDc, (int)left, (int)top, CopyPixelOperation.SourceCopy);51             Bitmap bitmap =System.Drawing.Image.FromHbitmap(compatibleBitmap);52             //释放
53 SelectObject(compatibleDc, bitmapObj);54 DeleteObject(compatibleBitmap);55 DeleteDC(compatibleDc);56 ReleaseDC(windowIntPtr, windowDc);57             returnbitmap;58         }

根据user32.dll下拿到的桌面信息-句柄获取桌面窗口的设备上下文,再以设备上下文分别创建内存设备上下文、设备位图句柄

1 BOOL BitBlt(2     HDC   hdcDest,  //目标设备上下文
3     int   nXDest,   //目标起始x坐标
4     int   nYDest,   //目标起始y坐标
5     int   nWidth,   //宽度(像素)
6     int   nHeight,  //高度(像素)
7     HDC   hdcSrc,   //源设备上下文
8     int   nXSrc,    //源起始x坐标
9     int   nYSrc,    //源起始y坐标
10     DWORD dwRop    //操作码(如CopyPixelOperation.SourceCopy)
11 );

图像位块传输BitBlt是最关键的函数,Gdi提供用于在设备上下文之间进行位图块的传输,从原设备上下文复现位图到创建的设备上下文

另外,与Bitblt差不多的还有StretchBlt,StretchBlt也是复制图像,但可以同时对图像进行拉伸或者缩小,需要缩略图可以用这个方法

然后以设备位图句柄输出一个位图System.Drawing.Bitmap,使用到的User32、Gdi32函数:


1     /// <summary>
2     ///获取桌面窗口3     /// </summary>
4     /// <returns></returns>
5     [DllImport("user32.dll")]6     public static externIntPtr GetDesktopWindow();7     /// <summary>
8     ///获取整个窗口的矩形区域9     /// </summary>
10     /// <returns></returns>
11     [DllImport("user32.dll", SetLastError = true)]12     public static extern bool GetWindowRect(IntPtr hwnd, outRECT lpRect);13     /// <summary>
14     ///检索整个窗口的设备上下文15     /// </summary>
16     /// <param name="hWnd">具有要检索的设备上下文的窗口的句柄</param>
17     /// <returns></returns>
18     [DllImport("user32.dll", SetLastError = true)]19     public static externIntPtr GetWindowDC(IntPtr hWnd);20     /// <summary>
21     ///创建与指定设备兼容的内存设备上下文22     /// </summary>
23     /// <param name="hdc">现有 DC 的句柄</param>
24     /// <returns>如果函数成功,则返回值是内存 DC 的句柄,否则返回Null</returns>
25     [DllImport("gdi32.dll")]26     public static externIntPtr CreateCompatibleDC([In] IntPtr hdc);27     /// <summary>
28     ///将对象选择到指定的设备上下文中29     /// </summary>
30     /// <param name="hdc">DC 的句柄</param>
31     /// <param name="gdiObj">要选择的对象句柄</param>
32     /// <returns>如果函数成功,则返回值是兼容位图 (DDB) 的句柄,否则返回Null</returns>
33     [DllImport("gdi32.dll")]34     public static externIntPtr SelectObject([In] IntPtr hdc, [In] IntPtr gdiObj);35     /// <summary>
36     ///创建与与指定设备上下文关联的设备的位图37     /// </summary>
38     /// <param name="hdc">设备上下文的句柄</param>
39     /// <param name="nWidth">位图宽度(以像素为单位)</param>
40     /// <param name="nHeight">位图高度(以像素为单位)</param>
41     /// <returns></returns>
42     [DllImport("gdi32.dll")]43     public static extern IntPtr CreateCompatibleBitmap([In] IntPtr hdc, int nWidth, intnHeight);44     /// <summary>
45     ///执行与从指定源设备上下文到目标设备上下文中的像素矩形对应的颜色数据的位块传输46     /// </summary>
47     /// <param name="hdcDest">目标设备上下文的句柄</param>
48     /// <param name="xDest">目标矩形左上角的 x 坐标(逻辑单位)</param>
49     /// <param name="yDest">目标矩形左上角的 y 坐标(逻辑单位)</param>
50     /// <param name="wDest">源矩形和目标矩形的宽度(逻辑单位)</param>
51     /// <param name="hDest">源矩形和目标矩形的高度(逻辑单位)</param>
52     /// <param name="hdcSource">源设备上下文的句柄</param>
53     /// <param name="xSrc">源矩形左上角的 x 坐标(逻辑单位)</param>
54     /// <param name="ySrc">源矩形左上角的 y 坐标(逻辑单位)</param>
55     /// <param name="rop">定义如何将源矩形的颜色数据与目标矩形的颜色数据相结合</param>
56     /// <returns></returns>
57     [DllImport("gdi32.dll")]58     public static extern boolBitBlt(IntPtr hdcDest,59         int xDest, int yDest, int wDest, inthDest, IntPtr hdcSource,60         int xSrc, intySrc, CopyPixelOperation rop);61     /// <summary>
62     ///删除逻辑笔、画笔、字体、位图、区域或调色板,释放与对象关联的所有系统资源。63     ///删除对象后,指定的句柄将不再有效。64     /// </summary>
65     /// <param name="hObject"></param>
66     /// <returns></returns>
67     [DllImport("gdi32.dll")]68     public static extern boolDeleteObject(IntPtr hObject);69     /// <summary>
70     ///删除指定的设备上下文71     /// </summary>
72     /// <param name="hdc">设备上下文的句设备上下文的句</param>
73     /// <returns></returns>
74     [DllImport("gdi32.dll")]75     public static extern boolDeleteDC([In] IntPtr hdc);76     /// <summary>
77     ///释放设备上下文 (DC),释放它以供其他应用程序使用78     /// </summary>
79     /// <param name="hWnd"></param>
80     /// <param name="hdc"></param>
81     /// <returns></returns>
82     [DllImport("user32.dll", SetLastError = true)]83     public static extern boolReleaseDC(IntPtr hWnd, IntPtr hdc);84 
85     /// <summary>
86     ///定义一个矩形区域。87     /// </summary>
88 [StructLayout(LayoutKind.Sequential)]89     public structRECT90 {91         /// <summary>
92         ///矩形左侧的X坐标。93         /// </summary>
94         public intLeft;95 
96         /// <summary>
97         ///矩形顶部的Y坐标。98         /// </summary>
99         public intTop;100 
101         /// <summary>
102         ///矩形右侧的X坐标。103         /// </summary>
104         public intRight;105 
106         /// <summary>
107         ///矩形底部的Y坐标。108         /// </summary>
109         public intBottom;110 
111         /// <summary>
112         ///获取矩形的宽度。113         /// </summary>
114         public int Width => Right -Left;115 
116         /// <summary>
117         ///获取矩形的高度。118         /// </summary>
119         public int Height => Bottom -Top;120 
121         /// <summary>
122         ///初始化一个新的矩形。123         /// </summary>
124         /// <param name="left">矩形左侧的X坐标。</param>
125         /// <param name="top">矩形顶部的Y坐标。</param>
126         /// <param name="right">矩形右侧的X坐标。</param>
127         /// <param name="bottom">矩形底部的Y坐标。</param>
128         public RECT(int left, int top, int right, intbottom)129 {130             Left =left;131             Top =top;132             Right =right;133             Bottom =bottom;134 }135     }

View Code

还有一种比较简单的方法Graphics.CopyFromScreen,看看调用DEMO:

1         private void GraphicsCaptureButton_OnClick(objectsender, RoutedEventArgs e)2 {3             var image =CaptureScreen1();4             CaptureImage.Source =ConvertBitmapToBitmapSource(image);5 }6         /// <summary>
7         ///截图屏幕8         /// </summary>
9         /// <returns></returns>
10         public staticBitmap CaptureScreen1()11 {12             IntPtr desktopWindow =GetDesktopWindow();13             //获取窗口位置大小
14             GetWindowRect(desktopWindow, out varlpRect);15             return CaptureScreenByGraphics(0, 0, lpRect.Width, lpRect.Height);16 }17         /// <summary>
18         ///截图屏幕19         /// </summary>
20         /// <param name="x">x坐标</param>
21         /// <param name="y">y坐标</param>
22         /// <param name="width">截取的宽度</param>
23         /// <param name="height">截取的高度</param>
24         /// <returns></returns>
25         public static Bitmap CaptureScreenByGraphics(int x, int y, int width, intheight)26 {27             var bitmap = newBitmap(width, height);28             using var graphics =Graphics.FromImage(bitmap);29             graphics.CopyFromScreen(x, y, 0, 0, newSystem.Drawing.Size(width, height), CopyPixelOperation.SourceCopy);30             returnbitmap;31         }

Graphics.CopyFromScreen调用简单了很多,与GDI有什么区别?

Graphics.CopyFromScreen内部也是通过GDI.BitBlt来完成屏幕捕获的,封装了下提供更高级别、易胜的API。

测试了下,第一种方法Gdi32性能比Graphics.CopyFromScreen性能略微好一点,冷启动时更明显点,试了2次耗时大概少个10多ms。

所以对于一般应用场景,使用 Graphics.CopyFromScreen 就足够了,但如果你需要更高的控制权和性能优化,建议使用 Gdi32.BitBlt

kybs00/CaptureImageDemo (github.com)

RAG+AI工作流+Agent:LLM框架该如何选择,全面对比MaxKB、Dify、FastGPT、RagFlow、Anything-LLM,以及更多推荐

1.MaxKB

MaxKB = Max Knowledge Base,是一款基于 LLM 大语言模型的开源知识库问答系统,旨在成为企业的最强大脑。它能够帮助企业高效地管理知识,并提供智能问答功能。想象一下,你有一个虚拟助手,可以回答各种关于公司内部知识的问题,无论是政策、流程,还是技术文档,MaxKB 都能快速准确地给出答案:比如公司内网如何访问、如何提交视觉设计需求等等

官方网址:
https://maxkb.cn/

1.1 简介

  1. 开箱即用
    :支持直接上传文档、自动爬取在线文档,支持文本自动拆分、向量化、RAG(检索增强生成),智能问答交互体验好;

  2. 无缝嵌入
    :支持零编码快速嵌入到第三方业务系统,让已有系统快速拥有智能问答能力,提高用户满意度;

  1. 灵活编排
    :内置强大的工作流引擎,支持编排 AI 工作流程,满足复杂业务场景下的需求;

  1. 模型中立
    :支持对接各种大语言模型,包括本地私有大模型(Llama 3 / Qwen 2 等)、国内公共大模型(通义千问 / 智谱 AI / 百度千帆 / Kimi / DeepSeek 等)和国外公共大模型(OpenAI / Azure OpenAI / Gemini 等)。

1.2技术框架和原理

  • 技术栈
    • 前端:Vue.js、logicflow
    • 后端:Python / Django
    • Langchain:Langchain
    • 向量数据库:PostgreSQL / pgvector
    • 大模型:Ollama、Azure OpenAI、OpenAI、通义千问、Kimi、百度千帆大模型、讯飞星火、Gemini、DeepSeek等。

2.Dify

Dify 是一款开源的大语言模型(LLM) 应用开发平台。它融合了后端即服务(Backend as Service)和 LLMOps 的理念,使开发者可以快速搭建生产级的生成式 AI 应用。即使你是非技术人员,也能参与到 AI 应用的定义和数据运营过程中。

由于 Dify 内置了构建 LLM 应用所需的关键技术栈,包括对数百个模型的支持、直观的 Prompt 编排界面、高质量的 RAG 引擎、稳健的 Agent 框架、灵活的流程编排,并同时提供了一套易用的界面和 API。这为开发者节省了许多重复造轮子的时间,使其可以专注在创新和业务需求上

2.1 简介

Dify 是一个开源的 LLM 应用开发平台。其直观的界面结合了 AI 工作流、RAG 管道、Agent、模型管理、可观测性功能等,让您可以快速从原型到生产。以下是其核心功能列表:

  1. 工作流
    : 在画布上构建和测试功能强大的 AI 工作流程,利用以下所有功能以及更多功能。

  2. 全面的模型支持
    : 与数百种专有/开源 LLMs 以及数十种推理提供商和自托管解决方案无缝集成,涵盖 GPT、Mistral、Llama3 以及任何与 OpenAI API 兼容的模型。

  3. Prompt IDE
    : 用于制作提示、比较模型性能以及向基于聊天的应用程序添加其他功能(如文本转语音)的直观界面。

  4. RAG Pipeline
    : 广泛的 RAG 功能,涵盖从文档摄入到检索的所有内容,支持从 PDF、PPT 和其他常见文档格式中提取文本的开箱即用的支持。

  5. Agent 智能体
    : 您可以基于 LLM 函数调用或 ReAct 定义 Agent,并为 Agent 添加预构建或自定义工具。Dify 为 AI Agent 提供了50多种内置工具,如谷歌搜索、DELL·E、Stable Diffusion 和 WolframAlpha 等。

  6. LLMOps
    : 随时间监视和分析应用程序日志和性能。您可以根据生产数据和标注持续改进提示、数据集和模型。

  7. 后端即服务
    : 所有 Dify 的功能都带有相应的 API,因此您可以轻松地将 Dify 集成到自己的业务逻辑中。

2.2 系统框架

工作流通过将复杂的任务分解成较小的步骤(节点)降低系统复杂度,减少了对提示词技术和模型推理能力的依赖,提高了 LLM 应用面向复杂任务的性能,提升了系统的可解释性、稳定性和容错性。

Dify 工作流分为两种类型:

  • Chatflow:面向对话类情景,包括客户服务、语义搜索、以及其他需要在构建响应时进行多步逻辑的对话式应用程序。

  • Workflow:面向自动化和批处理情景,适合高质量翻译、数据分析、内容生成、电子邮件自动化等应用程序。

为解决自然语言输入中用户意图识别的复杂性,Chatflow 提供了问题理解类节点。相对于 Workflow 增加了 Chatbot 特性的支持,如:对话历史(Memory)、标注回复、Answer 节点等。

为解决自动化和批处理情景中复杂业务逻辑,工作流提供了丰富的逻辑节点,如代码节点、IF/ELSE 节点、模板转换、迭代节点等,除此之外也将提供定时和事件触发的能力,方便构建自动化流程。

  • 常见案例


    • 客户服务:通过将 LLM 集成到您的客户服务系统中,您可以自动化回答常见问题,减轻支持团队的工作负担。 LLM 可以理解客户查询的上下文和意图,并实时生成有帮助且准确的回答。

    • 内容生成:无论您需要创建博客文章、产品描述还是营销材料,LLM 都可以通过生成高质量内容来帮助您。只需提供一个大纲或主题,LLM将利用其广泛的知识库来制作引人入胜、信息丰富且结构良好的内容。

    • 任务自动化:可以与各种任务管理系统集成,如 Trello、Slack、Lark、以自动化项目和任务管理。通过使用自然语言处理,LLM 可以理解和解释用户输入,创建任务,更新状态和分配优先级,无需手动干预。

    • 数据分析和报告:可以用于分析大型数据集并生成报告或摘要。通过提供相关信息给 LLM,它可以识别趋势、模式和洞察力,将原始数据转化为可操作的智能。对于希望做出数据驱动决策的企业来说,这尤其有价值。

    • 邮件自动化处理:LLM 可以用于起草电子邮件、社交媒体更新和其他形式的沟通。通过提供简要的大纲或关键要点,LLM 可以生成一个结构良好、连贯且与上下文相关的信息。这样可以节省大量时间,并确保您的回复清晰和专业。

3.FastGPT

FastGPT是一个功能强大的平台,专注于知识库训练和自动化工作流程的编排。它提供了一个简单易用的可视化界面,支持自动数据预处理和基于Flow模块的工作流编排。FastGPT支持创建RAG系统,提供自动化工作流程等功能,使得构建和使用RAG系统变得简单,无需编写复杂代码。

3.1 FastGPT 能力

  1. 专属 AI 客服
    :通过导入文档或已有问答对进行训练,让 AI 模型能根据你的文档以交互式对话方式回答问题。
    • 多库复用,混用
    • chunk 记录修改和删除
    • 源文件存储
    • 支持手动输入,直接分段,QA 拆分导入
    • 支持 txt,md,html,pdf,docx,pptx,csv,xlsx (有需要更多可 PR file loader)
    • 支持 url 读取、CSV 批量导入
    • 混合检索 & 重排
  2. 简单易用的可视化界面
    :FastGPT 采用直观的可视化界面设计,为各种应用场景提供了丰富实用的功能。通过简洁易懂的操作步骤,可以轻松完成 AI 客服的创建和训练流程。
  3. 自动数据预处理
    :提供手动输入、直接分段、LLM 自动处理和 CSV 等多种数据导入途径,其中“直接分段”支持通过 PDF、WORD、Markdown 和 CSV 文档内容作为上下文。FastGPT 会自动对文本数据进行预处理、向量化和 QA 分割,节省手动训练时间,提升效能。
  4. 工作流编排
    :基于 Flow 模块的工作流编排,可以帮助你设计更加复杂的问答流程。例如查询数据库、查询库存、预约实验室等。
    • 提供简易模式,无需操作编排
    • 工作流编排
    • 工具调用
    • 插件 - 工作流封装能力
    • Code sandbox
  5. 强大的 API 集成
    :FastGPT 对外的 API 接口对齐了 OpenAI 官方接口,可以直接接入现有的 GPT 应用,也可以轻松集成到企业微信、公众号、飞书等平台。

4.RagFlow

RAGFlow 是一款基于深度文档理解构建的开源 RAG(Retrieval-Augmented Generation)引擎。RAGFlow 可以为各种规模的企业及个人提供一套精简的 RAG 工作流程,结合大语言模型(LLM)针对用户各类不同的复杂格式数据提供可靠的问答以及有理有据的引用。

官网:
https://ragflow.io/

Github:
https://github.com/infiniflow/ragflow/blob/main

4.1 功能介绍

  • "Quality in, quality out"


    • 基于深度文档理解,能够从各类复杂格式的非结构化数据中提取真知灼见。
    • 真正在无限上下文(token)的场景下快速完成大海捞针测试。
  • 基于模板的文本切片


    • 不仅仅是智能,更重要的是可控可解释。
    • 多种文本模板可供选择
  • 有理有据、最大程度降低幻觉(hallucination)


    • 文本切片过程可视化,支持手动调整。
    • 有理有据:答案提供关键引用的快照并支持追根溯源。
  • 兼容各类异构数据源


    • 支持丰富的文件类型,包括 Word 文档、PPT、excel 表格、txt 文件、图片、PDF、影印件、复印件、结构化数据、网页等。
  • 全程无忧、自动化的 RAG 工作流


    • 全面优化的 RAG 工作流可以支持从个人应用乃至超大型企业的各类生态系统。
    • 大语言模型 LLM 以及向量模型均支持配置。
    • 基于多路召回、融合重排序。
    • 提供易用的 API,可以轻松集成到各类企业系统。
  • 最近更新功能


    • 2024-07-23 支持解析音频文件.
    • 2024-07-21 支持更多的大模型供应商(LocalAI/OpenRouter/StepFun/Nvidia).
    • 2024-07-18 在Graph中支持算子:Wikipedia,PubMed,Baidu和Duckduckgo.
    • 2024-07-08 支持 Agentic RAG: 基于 Graph 的工作流。

4.2 系统架构

5.Anything-LLM

AnythingLLM是一个全栈应用程序,您可以使用现成的商业大语言模型或流行的开源大语言模型,再结合向量数据库解决方案构建一个私有ChatGPT,不再受制于人:您可以本地运行,也可以远程托管,并能够与您提供的任何文档智能聊天。

AnythingLLM将您的文档划分为称为workspaces (工作区)的对象。工作区的功能类似于线程,同时增加了文档的容器化,。工作区可以共享文档,但工作区之间的内容不会互相干扰或污染,因此您可以保持每个工作区的上下文清晰。

官方:
https://anythingllm.com/

github:
https://github.com/Mintplex-Labs/anything-llm

  • AnythingLLM的一些特性


    • 多用户实例支持和权限管理
    • 工作区内的智能体Agent(浏览网页、运行代码等)
    • 为您的网站定制的可嵌入聊天窗口
    • 支持多种文档类型(PDF、TXT、DOCX等)
    • 通过简单的用户界面管理向量数据库中的文档
    • 两种对话模式:聊天和查询。聊天模式保留先前的对话记录。查询模式则是是针对您的文档做简单问答
    • 聊天中会提供所引用的相应文档内容
    • 100%云部署就绪。
    • “部署你自己的LLM模型”。
    • 管理超大文档时高效、低耗。只需要一次就可以嵌入(Embedding)一个庞大的文档或文字记录。比其他文档聊天机器人解决方案节省90%的成本。
    • 全套的开发人员API,用于自定义集成!
  • 支持的 LLM、嵌入模型和向量数据库


    • LLM:包括任何开源的 llama.cpp 兼容模型、OpenAI、Azure OpenAI、Anthropic ClaudeV2、LM Studio 和 LocalAi。
    • 嵌入模型:AnythingLLM 原生嵌入器、OpenAI、Azure OpenAI、LM Studio 和 LocalAi。
    • 向量数据库:LanceDB(默认)、Pinecone、Chroma、Weaviate 和 QDrant。
  • 技术概览


    • 整个项目设计为单线程结构,主要由三部分组成:收集器、前端和服务器。
    • collector:Python 工具,可快速将在线资源或本地文档转换为 LLM 可用格式。
    • frontend:ViteJS + React 前端,用于创建和管理 LLM 可使用的所有内容。
    • server:NodeJS + Express 服务器,处理所有向量数据库管理和 LLM 交互。

6.更多LLM框架推荐

更多框架推荐参考下述文章:LLM框架、RAG框架、Agent框架

6.1 DB-GPT: 用私有化LLM技术定义数据库下一代交互方式

DB-GPT是一个开源的AI原生数据应用开发框架(AI Native Data App Development framework with AWEL(Agentic Workflow Expression Language) and Agents)。

目的是构建大模型领域的基础设施,通过开发多模型管理(SMMF)、Text2SQL效果优化、RAG框架以及优化、Multi-Agents框架协作、AWEL(智能体工作流编排)等多种技术能力,让围绕数据库构建大模型应用更简单,更方便。

数据3.0 时代,基于模型、数据库,企业/开发者可以用更少的代码搭建自己的专属应用。

6.1.1 架构方案

  • 核心能力主要有以下几个部分:


    • RAG(Retrieval Augmented Generation),RAG是当下落地实践最多,也是最迫切的领域,DB-GPT目前已经实现了一套基于RAG的框架,用户可以基于DB-GPT的RAG能力构建知识类应用。

    • GBI:生成式BI是DB-GPT项目的核心能力之一,为构建企业报表分析、业务洞察提供基础的数智化技术保障。

    • 微调框架: 模型微调是任何一个企业在垂直、细分领域落地不可或缺的能力,DB-GPT提供了完整的微调框架,实现与DB-GPT项目的无缝打通,在最近的微调中,基于spider的准确率已经做到了82.5%

    • 数据驱动的Multi-Agents框架: DB-GPT提供了数据驱动的自进化Multi-Agents框架,目标是可以持续基于数据做决策与执行。

    • 数据工厂: 数据工厂主要是在大模型时代,做可信知识、数据的清洗加工。

    • 数据源: 对接各类数据源,实现生产业务数据无缝对接到DB-GPT核心能力。

6.1.2 RAG生产落地实践架构

6.2 Langchain-Chatchat

项目支持市面上主流的开源 LLM、 Embedding 模型与向量数据库,可实现全部使用开源模型离线私有部署。与此同时,本项目也支持 OpenAI GPT API 的调用,并将在后续持续扩充对各类模型及模型 API 的接入。

原理如下图所示:过程包括加载文件 -> 读取文本 -> 文本分割 -> 文本向量化 -> 问句向量化 -> 在文本向量中匹配出与问句向量最相似的 top k个 -> 匹配出的文本作为上下文和问题一起添加到 prompt中 -> 提交给 LLM生成回答。

7. 总结(选择建议)

在选择AI应用开发平台时,了解不同平台的功能、社区支持以及部署便捷性是非常重要的。

7.0 优劣势选择

MaxKB/Dify的优势与劣势

  • 优势


    • 大模型接入灵活性
      :提供了多种大模型接入方式,支持多种API接口,使得开发者可以根据需求灵活选择和切换模型,这对于需要高性能模型的应用场景尤为重要。

    • 强大的Chat功能
      :Chat功能不仅支持多轮对话,还能通过智能推荐和上下文理解提升用户体验,适用于需要复杂交互的场景。

    • 丰富的知识库支持
      :内置了知识库管理系统,支持多种数据格式的导入和导出,便于用户管理和利用知识资源。

    • 高效的Workflow设计
      :Workflow设计简洁直观,支持拖拽式操作,使得非技术人员也能快速上手,大大降低了使用门槛。

    • Prompt IDE
      :提供的Prompt IDE工具,让开发者可以更直观地调试和优化提示词,提升了开发效率。

  • 劣势


    • 学习曲线
      :虽然界面设计较为友好,但对于初学者来说,仍需要一定时间来熟悉其工作流程和功能。

    • 社区支持
      :相较于一些成熟的开发平台,社区活跃度和资源丰富度还有待提升,这可能会影响到开发者在遇到问题时的解决速度。

    • 定制化程度
      :虽然Dify提供了丰富的功能,但在某些高度定制化的需求上,可能还需要进一步的开发和调整。

FastGPT/RagFlow的优势与劣势

  • 优势


    • Agent智能体
      :Agent智能体功能强大,能够自动执行复杂任务,减少了人工干预的需求,适用于需要自动化处理大量任务的场景。

    • LLMOps支持
      :提供了LLMOps支持,使得开发者可以更方便地进行模型训练、优化和部署,这对于AI模型的持续迭代和优化至关重要。

    • 后端即服务
      :提供了后端即服务的功能,简化了后端开发流程,使得开发者可以更专注于前端和业务逻辑的开发。

    • 强大的RAG引擎
      :RAG引擎能够高效地处理和检索大量数据,适用于需要快速响应和高吞吐量的应用场景。

  • 劣势


    • 功能复杂性
      :FastGPT的功能较为复杂,对于初学者来说,可能需要较长时间来掌握其使用方法和技巧。

    • 部署难度
      :相较于一些轻量级的开发平台,FastGPT的部署过程可能更为复杂,需要一定的技术背景和经验。

    • 用户界面
      :虽然FastGPT的功能强大,但其用户界面可能不如一些竞争对手直观和友好,这可能会影响到用户的使用体验。

7.1 根据需求选择平台

选择合适的平台首先要明确自己的需求。Dify和FastGPT各有特点,适用于不同的应用场景。

  • MaxKB/Dify:适合需要快速构建和部署AI应用的开发者,提供了丰富的预设模板和集成工具,使得开发者可以快速上手,尤其适合初学者和需要快速验证想法的团队。

  • FastGPT/RagFlow:适合需要高度定制化和复杂工作流的企业级用户,提供了强大的RAG引擎和Workflow orchestration,能够处理复杂的业务逻辑和数据处理需求。

  • 在选择平台时,应考虑以下因素:


    • 项目规模:如果是小型项目或初创团队,MaxKB/Dify的快速部署和简单易用性可能更适合。如果是大型企业级项目,FastGPT/RagFlow的强大功能和定制化能力更为合适。

    • 技术栈:考虑团队现有的技术栈和成员的技术背景。在技术实现上有所不同,选择与团队技术栈匹配的平台可以减少学习成本和开发难度。

    • 功能需求:明确项目所需的核心功能,如大模型接入、Chat功能、知识库等。Dify和FastGPT在这些功能上各有优势,根据具体需求进行选择。

7.2 社区与支持对比

社区支持和资源丰富度对于平台的选择也至关重要。

  • MaxKB/Dify:拥有一个活跃的社区,提供了丰富的文档、教程和示例代码。社区成员经常分享使用心得和解决方案,对于遇到的问题可以快速得到帮助。

  • FastGPT/RagFlow:社区相对较小,但提供了专业的技术支持团队。对于企业级用户,FastGPT提供了定制化的技术支持和咨询服务,确保项目的顺利进行。

  • 在选择平台时,应考虑以下因素:


    • 社区活跃度:活跃的社区意味着更多的资源和更快的解决问题速度。社区活跃度较高,适合需要快速解决问题的开发者。

    • 技术支持:对于企业级用户,专业的技术支持至关重要。提供了专业的技术支持,适合对技术支持有较高要求的用户。

7.3 部署与使用便捷性

部署和使用的便捷性直接影响开发效率和成本。

  • MaxKB/Dify:提供了简单易用的界面和一键部署功能,使得开发者可以快速将应用部署到云端或本地。文档详细,适合初学者快速上手。

  • FastGPT/RagFlow:部署相对复杂,需要一定的技术背景和配置。提供了强大的定制化能力,适合对性能和功能有较高要求的用户。

  • 在选择平台时,应考虑以下因素:


    • 部署难度:MaxKB/Dify的部署过程简单,适合需要快速部署的开发者。FastGPT/RagFlow的部署相对复杂,但提供了更多的配置选项。

    • 使用便捷性:MaxKB/Dify的用户界面友好,操作简单。FastGPT/RagFlow的用户界面相对复杂,但提供了更多的功能和定制化选项。

更多优质内容请关注公号:汀丶人工智能;会提供一些相关的资源和优质文章,免费获取阅读。

更多优质内容请关注CSDN:汀丶人工智能;会提供一些相关的资源和优质文章,免费获取阅读。

从pytest源码的角度分析pytest工作原理


pytest
源代码的角度来分析其工作原理,我们需要关注几个关键的部分,特别是
pytest
的启动过程以及测试的收集与执行。下面是基于
pytest
源代码的一个高层次的概述。

pytest 的启动过程

  1. 命令行解析:


    • pytest
      的入口点是
      conftest.py
      文件中的
      pytest.main()
      函数。
    • 在这个函数中,首先通过
      pytest.config.get_config()
      获取配置。
    • 接着使用
      pytest.config.parse()
      来解析命令行参数。
  2. 配置加载:


    • pytest
      会在当前目录及其父目录递归地查找配置文件,比如
      pytest.ini

      pyproject.toml
    • 使用
      pytest.config.Config
      类来存储配置信息。
  3. 插件管理:


    • 通过
      pytest.hookspec

      pytest.pluginmanager
      来管理插件。
    • 插件可以在各个阶段被注册并调用。

测试收集过程

  1. 收集器初始化:


    • pytest
      使用
      pytest.collect
      模块来处理测试收集。
    • Session.from_parent
      方法创建一个新的
      Session
      实例。
    • Collector.from_parent
      方法用于构建收集器树。
  2. 测试文件发现:


    • pytest
      通过
      Session.perform_collect
      方法来遍历目录结构并发现测试模块。
    • File.from_parent
      方法用于创建
      File
      实例来代表测试文件。
    • Function.from_parent
      方法用于创建
      Function
      实例来代表测试函数。
  3. 测试项构建:


    • 一旦发现了测试文件,就会通过
      collect
      方法来收集文件中的测试函数。
    • 测试函数会被转换成
      Item
      实例。

测试执行过程

  1. 测试项准备:


    • 在测试开始之前,会调用
      Session.perform_setup
      方法来进行一些预处理。
    • 这个阶段可能包括设置环境变量、初始化数据库连接等。
  2. 测试项执行:


    • Session.runtestloop
      方法控制测试项的实际执行。
    • 对于每一个
      Item
      实例,都会调用
      Session.perform_test
      方法来执行测试。
  3. 测试结果收集:


    • 测试执行的结果会被收集并存储在
      Item
      实例中。
    • 可能会触发
      pytest_runtest_logreport
      hook,该 hook 被用来处理测试报告。
  4. 异常处理:


    • 如果测试过程中发生异常,
      pytest
      会捕获这些异常并记录下来。
    • 异常可以通过
      pytest_runtest_makereport
      hook 来处理。

测试报告生成

  1. Session
    实例负责收集所有的测试结果。
  2. Session.exitstatus
    属性会根据测试结果来确定程序的退出状态码。
  3. pytest
    可以生成多种格式的报告,这取决于安装的插件。

示例代码片段

下面是一些示例代码片段,展示了
pytest
源代码中的关键部分:

#pytest/conftest.py
def main(args=None):#解析命令行参数
    config =get_config(args)#加载插件
    pm =PluginManager()
pm.load_setuptools_entrypoints(
'pytest11')#创建 Session 实例 session = Session.from_parent(config, plugins=pm)#执行测试 session.runtestloop()#返回退出状态 returnsession.exitstatus#pytest/collect.py defperform_collect(session, collector):#收集测试文件和测试函数 items =[]for item incollector.collect():
items.append(item)
returnitems#pytest/runner.py defruntest_protocol(item, nextitem):#执行测试项 report =item.runtest()if report isNone:#处理异常情况 report =item.makereport()#处理测试报告 item.session._hookmanager.hook.pytest_runtest_logreport(report=report)