.NET分布式Orleans - 6 - 事件溯源
基本概念
事件溯源(Event Sourcing)是一种设计模式,它记录并存储了应用程序状态变化的所有事件。
其核心思想是将系统中的每次状态变化都视为一个事件,并将这些事件以时间顺序的方式持久化存储。
这样,通过重放这些事件,我们可以重建系统在任何特定时间点的状态。
每个事件通常都包含了描述状态变化的必要信息,以及发生状态变化的原因和时间戳。
工作原理
工作原理方面,事件溯源主要依赖于两个关键部分:事件生成和事件存储。
当系统中发生状态变化时,会生成一个或多个事件,这些事件随后被存储到事件存储中。
事件存储需要设计成高可用、高一致且可伸缩的,以支持大规模的系统操作。
之后,当需要重建系统状态时,只需从事件存储中按顺序读取事件,并依次应用这些事件到系统状态即可。
使用场景
在Orleans7中,事件溯源主要应用在以下几个场景:
分布式系统状态同步:在分布式系统中,各个节点之间的状态同步是一个重要问题。通过事件溯源,每个节点都可以记录并发送自己的状态变化事件,其他节点则可以通过订阅这些事件来同步自己的状态。
历史数据追踪和审计:在某些业务场景下,需要追踪系统的历史操作记录,以进行审计或分析。事件溯源提供了完整的操作历史,可以方便地查询和回放历史事件。
容错和恢复:当系统发生故障时,通过事件溯源可以方便地恢复到故障发生前的状态,或者根据事件日志进行故障排查。
优势
事件溯源在Orleans7中带来了以下优势:
数据完整性和一致性:由于事件溯源记录了所有状态变化的历史,因此可以确保数据的完整性和一致性。
灵活性和可扩展性:事件溯源的设计使得系统可以很容易地添加新的状态变化事件,同时也支持大规模的系统扩展。
容错和恢复能力:通过事件溯源,可以轻松地恢复到系统的任何历史状态,大大提高了系统的容错和恢复能力。
清晰的业务逻辑:每个事件都代表了一个具体的业务操作,因此通过查看事件日志,可以清晰地了解系统的业务逻辑和操作流程。
总的来说,事件溯源是一种强大而灵活的设计模式,它在Orleans7中的应用为分布式系统带来了诸多优势。对于软件开发者来说,理解和掌握事件溯源机制,将有助于构建更加健壮、可靠和可扩展的分布式系统。
示例
下面使用事件溯源,来跟踪一个账户的变更记录。
首先需要安装必须的nuget包
<PackageReference Include="Microsoft.Orleans.EventSourcing" Version="8.0.0" /> <PackageReference Include="Microsoft.Orleans.Clustering.AdoNet" Version="8.0.0" /> <PackageReference Include="Microsoft.Orleans.Persistence.AdoNet" Version="8.0.0" /> <PackageReference Include="Microsoft.Orleans.Server" Version="8.0.0" />
然后设置Orleans,除了Orleans的常规设置外,还需要 siloHostBuilder.AddLogStorageBasedLogConsistencyProvider("LogStorage") 来设置LogConsistencyProvider
builder.Host.UseOrleans(static siloHostBuilder =>{var invariant = "System.Data.SqlClient";var connectionString = "Data Source=localhost\\SQLEXPRESS;Initial Catalog=orleanstest;User Id=sa;Password=12334;";
siloHostBuilder.AddLogStorageBasedLogConsistencyProvider("LogStorage");//Use ADO.NET for clustering siloHostBuilder.UseAdoNetClustering(options =>{
options.Invariant=invariant;
options.ConnectionString=connectionString;
}).ConfigureLogging(logging=>logging.AddConsole());
siloHostBuilder.Configure<ClusterOptions>(options =>{
options.ClusterId= "my-first-cluster";
options.ServiceId= "SampleApp";
});//Use ADO.NET for persistence siloHostBuilder.AddAdoNetGrainStorage("GrainStorageForTest", options =>{
options.Invariant=invariant;
options.ConnectionString=connectionString;//options.GrainStorageSerializer = new JsonGrainStorageSerializer() });
});
定义账户的存储和提取事件类
//the classes below represent events/transactions on the account//all fields are user-defined (none have a special meaning),//so these can be any type of object you like, as long as they are serializable//(so they can be sent over the wire and persisted in a log). [Serializable]
[GenerateSerializer]public abstract classTransaction
{/// <summary>A unique identifier for this transaction</summary> [Id(0)]public Guid Guid { get; set; }/// <summary>A description for this transaction</summary> [Id(1)]public string Description { get; set; }/// <summary>time on which the request entered the system</summary> [Id(2)]public DateTime IssueTime { get; set; }
}
[Serializable]
[GenerateSerializer]public classDepositTransaction : Transaction
{
[Id(0)]public uint DepositAmount { get; set; }
}
[Serializable]
[GenerateSerializer]public classWithdrawalTransaction : Transaction
{
[Id(0)]public uint WithdrawalAmount { get; set; }
}
再定义账户的Grain,其中有存钱,取钱,获取余额,与变更记录操作
Grain类必须具有 LogConsistencyProviderAttribute 才能指定日志一致性提供程序。 还需要 StorageProviderAttribute设置存储。
/// <summary> ///An example of a journaled grain that models a bank account./// ///Configured to use the default storage provider.///Configured to use the LogStorage consistency provider./// ///This provider persists all events, and allows us to retrieve them all./// </summary> /// <summary> ///A grain that models a bank account/// </summary> public interfaceIAccountGrain : IGrainWithStringKey
{
Task<uint>Balance();
Task Deposit(uint amount, Guid guid, stringdesc);
Task<bool> Withdraw(uint amount, Guid guid, stringdesc);
Task<IReadOnlyList<Transaction>>GetTransactionLog();
}
[StorageProvider(ProviderName= "GrainStorageForTest")]
[LogConsistencyProvider(ProviderName= "LogStorage")]public class AccountGrain : JournaledGrain<AccountGrain.GrainState, Transaction>, IAccountGrain
{/// <summary> ///The state of this grain is just the current balance./// </summary> [Serializable]
[Orleans.GenerateSerializer]public classGrainState
{
[Orleans.Id(0)]public uint Balance { get; set; }public voidApply(DepositTransaction d)
{
Balance= Balance +d.DepositAmount;
}public voidApply(WithdrawalTransaction d)
{if (d.WithdrawalAmount >Balance)throw new InvalidOperationException("we make sure this never happens");
Balance= Balance -d.WithdrawalAmount;
}
}public Task<uint>Balance()
{returnTask.FromResult(State.Balance);
}public Task Deposit(uint amount, Guid guid, stringdescription)
{
RaiseEvent(newDepositTransaction()
{
Guid=guid,
IssueTime=DateTime.UtcNow,
DepositAmount=amount,
Description=description
});//we wait for storage ack returnConfirmEvents();
}public Task<bool> Withdraw(uint amount, Guid guid, stringdescription)
{//if the balance is too low, can't withdraw//reject it immediately if (State.Balance <amount)return Task.FromResult(false);//use a conditional event for withdrawal//(conditional events commit only if the version hasn't already changed in the meantime)//this is important so we can guarantee that we never overdraw//even if racing with other clusters, of in transient duplicate grain situations return RaiseConditionalEvent(newWithdrawalTransaction()
{
Guid=guid,
IssueTime=DateTime.UtcNow,
WithdrawalAmount=amount,
Description=description
});
}public Task<IReadOnlyList<Transaction>>GetTransactionLog()
{return RetrieveConfirmedEvents(0, Version);
}
}
最后即可通过client生成grain,并获取账户变动记录
var palyer = client.GetGrain<IAccountGrain>("zhangsan");await palyer.Deposit(1000, Guid.NewGuid(), "aaa");var logs = awaitpalyer.GetTransactionLog();return Results.Ok(logs);