Dapr Outbox 是1.12中的功能。
本文只介绍Dapr Outbox 执行流程,Dapr Outbox基本用法请阅读
官方文档

。本文中appID=order-processor,topic=orders

本文前提知识:熟悉
Dapr状态管理

Dapr发布订阅

Outbox 模式

Outbox 模式
的核心是在同一个数据库事务中保存业务数据和待发布的事件消息,再由某个“定时任务”读取待发布的事件消息并发布事件(并删除数据库中事件消息)
相关文章:
.NET中实现Outbox模式的框架CAP,作者Savorboard
使用 dotnetcore/CAP 的本地消息表模式,圣杰

先在内部发布一个主题(topic)

要使用Dapr Outbox,在.NET中就是调用
DaprClient

ExecuteStateTransactionAsync(...)
方法(得先完成Outbox相关的配置!),调用此方法会完成事务操作(保存业务数据和待发布的事件消息)并发布事件消息。

string DAPR_STORE_NAME = "statestoresql";
var client = new DaprClientBuilder().Build();
var orderId = 1;
var order = new Order(orderId);

var bytes = JsonSerializer.SerializeToUtf8Bytes(order);
var upsert = new List<StateTransactionRequest>()
{
    new StateTransactionRequest(orderId.ToString(), bytes, StateOperationType.Upsert)
};

// 保存状态,并发布事件消息
await client.ExecuteStateTransactionAsync(DAPR_STORE_NAME, upsert);

public record Order([property: JsonPropertyName("orderId")] int orderId);
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: orderpubsub # 发布订阅组件
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestoresql  # 状态组件
spec:
  type: state.mysql
  version: v1
  metadata:
    - name: connectionString
      value: "root:mysecret@tcp(localhost:3306)/?allowNativePasswords=true"
    - name: outboxPublishPubsub
      value: orderpubsub
    - name: outboxPublishTopic
      value: orders

调用
ExecuteStateTransactionAsync(...)
方法时,此方法把请求转发给sidecar,sidecar会发布一个
内部主题
。所谓内部,就是供Dapr使用,用户不用操作;所谓主题(Topic)就是一个事件;此
主题格式
为:namespace + appID + topic + "outbox" ,假设appID=order-processor,topic=orders,则内部主题(Topic)名就是
order-processorordersoutbox
(namespace 是与k8s有关),此主题用于判断事务是否执行成功。

注:该内部主题(topic)默认和事件消息使用同一个Dapr
发布/订阅组件
,可以通过配置状态组件的元数据(metadata配置)字段
outboxPubsub
单独指定内部主题所使用的发布/订阅组件。相关配置请看
官方文档

主题内容

CloudEvent
格式,发布的事件数据如下(真正的待发布事件消息就是json中的data字段,后面就是读取的此值):

{
    "data":"{\"orderId\":1}",
    "datacontenttype":"text/plain",
    "id":"outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0",
    "pubsubname":"orderpubsub",
    "source":"order-processor",
    "specversion":"1.0",
    "time":"2024-01-25T17:12:31+08:00",
    "topic":"",
    "traceid":"",
    "traceparent":"",
    "tracestate":"",
    "type":"com.dapr.event.sent"
}

有了事件的发布者,那事件的订阅者是谁呢?
appID=order-processor的Dapr sidecar实例
。可以是执行保存状态的sidecar程序,或者是appID=order-processor的其他sidecar。

在同一事务中保存状态和事件消息

  • 在内部主题(Topic)发布
    成功

    ,会在同一事务中保存状态和事件消息,也就是将方法
    client.ExecuteStateTransactionAsync(...)
    中的数据保存到数据库。id为
    outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
    的表示需待发布事件消息,id为
    order-processor||1
    表示状态数据。事件消息和状态数据保存在同一张表
    state
    中,在mysql中其表结构和数据如下所示。

  • 如果此内部主题(Topic)发布
    失败
    ,调用方直接抛异常,不会执行事务操作!
    state
    表不会有下面两条数据。

  • "eyJvcmRlcklkIjoxfQ=="既是状态数据又是待发布的事件数据;经过Base64解码,得到该值为json格式,即:
    {"orderId":1}

CREATE TABLE `state`  (
  `id` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `value` json NOT NULL,
  `isbinary` tinyint(1) NOT NULL,
  `insertDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `updateDate` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `eTag` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  `expiredate` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `expiredate_idx`(`expiredate` ASC) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
id value isbinary insertDate updateDate eTag expiredate
outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0 "0" 0 2024-01-25 09:22:14 2024-01-25 09:22:14 07884eed-eb5d-4887-8399-051c71206ed5
order-processor||1 "eyJvcmRlcklkIjoxfQ==" 1 2024-01-25 09:12:31 2024-01-25 09:22:14 3d1e368f-f6d8-4ccd-946d-c10090c7cc42

内部主题(Topic)的订阅者发布事件消息

数据库事务执行成功后,什么时候把事件消息发布出去呢?

事件消息
发布出去是在
内部主题(Topic)的订阅者
中实现的,具体如下:

步骤X

appID

order-processor
的sidecar接收到内部主题(Topic)发送的事件,然后通过查询判断id为
outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
的数据是否存在?

  • 如果存在,表示状态数据和事件消息都已保存在mysql中,则发布
    事件消息
    (事件数据就前面提到的data字段)。事件发布成功后,则删除id为
    outbox-a53e45f3-d646-4e4e-bcbf-0692ec7b9dd0
    的记录。
  • 如果不存在就直接退出,停止后续操作;事件的订阅者会多次收到订阅消息,即重复
    步骤X
    过程。

这里会有一个问题:接收到内部主题(Topic)后,状态和事件消息可能没有持久化到mysql(前面提到过,Dapr sidecar是先发布一个内部主题,再在同一事务中保存状态和事件消息)。所以获取状态执行以下重试策略。删除状态时也是此重试策略。

bo := &backoff.ExponentialBackOff{
    InitialInterval:     time.Millisecond * 500,// 初始间隔
    MaxInterval:         time.Second * 3,       // 最大间隔。重试时间超过此值时,以此值为准
    MaxElapsedTime:      time.Second * 10,      // 累计重试时间
    Multiplier:          3,                     // 递增倍数  
    Clock:               backoff.SystemClock,
    RandomizationFactor: 0.1,                   // 随机因子  
}

总结

Dapr Outbox 执行流程简单说就是:先发布一个内部事件,再执行保存业务数据和事件消息,内部事件的订阅者再发布真正的事件消息。Dapr轮询数据库中待发布
事件消息
是通过订阅一个内部主题(Topic)实现的。
因为状态保存和事件发布是在sidecar中执行,所以
业务代码和事件消息不在同一个事务中
!!!Dapr Outbox是把
业务的状态数据和事件消息在同一个事务中保存
,也就是代码
client.ExecuteStateTransactionAsync(...)
;并且状态数据和事件消息是保存到同一张表
state
中。

参考:

代码

Enable the transactional outbox pattern

outbox.go

Outbox issues

标签: none

添加新评论