像 TransactionScope 一样优雅的使用数据库事务
—— 基于 AsyncLocal<T> 的事务实现
背景:TransactionScope 优雅的事务代码
using (var scope = new TransactionScope())
{
// ...
scope.Complete();
}
一直感觉 TransactionScope 的事务代码比较优雅:
using
作用域 即 事务的作用域,可以明确的标识出事务的代码范围,可读性大大提高。- 在
Complete
方法被调用后,事务才会提交。 - 相应的,
Complete
之前只要抛出了任何异常,则事务会自动回滚(因为Complete
方法不会被调用)。
但目前异步大并发的场景下,想用好 TransactionScope 并不容易;且很多时候,我们并不涉及分布式事务,只是一个简单的数据库单机事务,使用 TransactionScope 有些不太值当。
所以今天我们使用AsyncLocal<T>来实现一个事务组件,以提供类似于 TransactionScope 的事务写法。
关于 AsyncLocal<T>
AsyncLocal 与 .NET Framework 中的 CallContext 类似,有两个特性:
- 提供与 ThreadLocal 类似的 线程本地 数据存储。
- 当线程发生切换时,可以将 线程本地 存储的数据复制到子线程。
基于 AsyncLocal<T> 的事务设计
数据库操作一般会被认为比较耗时,会被设计为异步方法,这就会造成:
- 复杂的事务流程内涉及到多个数据库操作,这些数据库操作方法都是异步的。
- 事务内的每个异步方法都大概率会发生线程切换。
- 在事务提交之前,事务内的所有操作都需要被执行成功。
基于以上原因,我们的事务流程应该类似于:
- 从某个线程上使用 using 的方式开启一个事务,并将事务的id存储于 AsyncLocal 变量中
- 事务的内部的多个数据库操作方式自由使用同步或异步的方式
- 当SQL语句准备执行时,检查 AsyncLocal 变量中是否存在事务id,如果存在,则根据事务id找到具体的事务对象,并将此事务对象传入当前数据库连接
- 等待所有异步方法执行完成(或抛出异常)
- 执行Complete方法提交事务(若无异常)
- 当事务发生嵌套时,内层事务的Complete方法不提交事务,等待最外层的事务统一提交(考虑代码复用,同一个带事务的方法可以自己作为最外层事务单独使用,也可以作为其他事务的一部分嵌套使用)
- 在Dispose方法中移除 AsyncLocal 中保存的事务id,并销毁事务对象
代码如下(以mysql为例,sqlserver同理):
using MySql.Data.MySqlClient;
using System;
using System.Data;
namespace NIP.Infrastructure.Data.MySql.Core
{
public class Database
{
private static string _defaultKey = "connstrings:database:default";
/// <summary>
/// 根据连接字符串的configkey获取一个数据库连接
/// </summary>
internal static MySqlConnection GetInstance(string connKey = null)
{
if (string.IsNullOrEmpty(connKey))
connKey = _defaultKey;
var connstr = ConfigurationManager.AppSettings[connKey];
return new MySqlConnection(connstr);
}
/// <summary>
/// 创建一个新的事务作用域
/// </summary>
/// <param name="connKey">链接字符串的 config key</param>
/// <param name="level">事务隔离级别</param>
/// <returns></returns>
public static MySqlTransactionScope CreateTransactionScope(string connKey = default, IsolationLevel? level = default)
{
var conn = GetInstance(connKey);
conn.Open();
if (!level.HasValue && !timeout.HasValue)
{
var trans = conn.BeginTransaction();
return new MySqlTransactionScope(trans);
}
if (level.HasValue && timeout.HasValue)
{
var trans = conn.BeginTransaction(level.Value);
return new MySqlTransactionScope(trans);
}
if (level.HasValue)
{
var trans = conn.BeginTransaction(level.Value);
return new MySqlTransactionScope(trans);
}
return new MySqlTransactionScope(conn.BeginTransaction());
}
/// <summary>
/// 创建一个新的事务作用域 或 直接获取当前事务作用域(如果当前代码在一个事务作用域内部)
/// 如果在一个事务作用域(B)内部调用此方法,则得到的事务作用域(A)会被认为是事务B的一部分,且A的Commit会被忽略
/// </summary>
/// <param name="connKey">链接字符串的 config key (仅创建时使用)</param>
/// <param name="level">事务隔离级别 (仅创建时使用)</param>
/// <returns></returns>
public static MySqlTransactionScope GetOrCreateTransactionScope(string connKey = default, IsolationLevel? level = default)
{
var currentId = MySqlTransactionScope.CurrentScopeId.Value;
if (currentId.HasValue)
{
var id = currentId.Value;
var trans = MySqlTransactionScope.GetById(id);
return new MySqlTransactionScope(id, trans);
}
return CreateTransactionScope(connKey, level);
}
/// <summary>
/// 直接获取当前事务作用域(如果当前代码在一个事务作用域内部, 否则会返回null)
/// 显式调用A的Commit会被忽略,只有B的Commit会生效
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
internal static MySqlTransactionScope GetCurrentTransactionScope()
{
var currentId = MySqlTransactionScope.CurrentScopeId.Value;
if (currentId.HasValue)
{
var id = currentId.Value;
var trans = MySqlTransactionScope.GetById(id);
return new MySqlTransactionScope(id, trans, inner: false);
}
return null;
}
/// <summary>
/// 根据指定的作用域(B)的id获取事务作用域(A),并使A成为被指定id的事务作用域(B)的一部分
/// 显式调用A的Commit会被忽略,只有B的Commit会生效
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
public static MySqlTransactionScope GetTransactionScope(Guid id)
{
var trans = MySqlTransactionScope.GetById(id);
return new MySqlTransactionScope(id, trans);
}
}
}
using MySql.Data.MySqlClient;
using NIP.Infrastructure.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Data;
using System.Threading;
namespace NIP.Infrastructure.Data.MySql.Core
{
public class MySqlTransactionScope : IDisposable
{
private static ConcurrentDictionary<Guid, MySqlTransaction> _transAll = new ConcurrentDictionary<Guid, MySqlTransaction>();
private MySqlTransaction _trans;
internal static AsyncLocal<Guid?> CurrentScopeId = new AsyncLocal<Guid?>();
public Guid Id { get; private set; }
// 标记当前事务是否为一个 子事务
public bool _inner = false;
internal MySqlTransactionScope(MySqlTransaction transaction)
{
Id = Guid.NewGuid();
_trans = transaction;
var succeed = _transAll.TryAdd(Id, _trans);
if (!succeed)
{
var code = ExceptionCode.DATABASE_ERROR;
var msg = "failed to create new transaction.";
throw new ForeseenException(code, msg);
}
CurrentScopeId.Value = Id;
}
/// <summary>
/// 构造函数
/// inernal: 保证只有基础类库程序集内部可以调用此构造函数,用来生成一个 子事务
/// </summary>
/// <param name="id">id</param>
/// <param name="transaction">transaction</param>
/// <param name="inner">是否作为指定id的scope的一部分(子作用域)</param>
internal MySqlTransactionScope(Guid id, MySqlTransaction transaction, bool inner = true)
{
Id = id;
_trans = transaction;
_inner = inner;
}
internal MySqlTransaction Current
{
get
{
return _trans;
}
}
internal static MySqlTransaction GetById(Guid id)
{
var succeed = _transAll.TryGetValue(id, out var trans);
if (!succeed)
{
var code = ExceptionCode.DATABASE_ERROR;
var msg = $"transaction (id: {id.ToString().ToLower()}) not found.";
throw new ForeseenException(code, msg);
}
return trans;
}
public void Complete()
{
// 如果为内层事务,则忽略Complete方法对事务的提交,等待最外层事务的Complete来提交事务
if (!_inner)
{
_trans.Commit();
CurrentScopeId.Value = null;
if (_trans.Connection.State == ConnectionState.Open)
_trans.Connection.Close();
}
}
public void Dispose()
{
if (!_inner)
{
if (_trans.Connection.State == ConnectionState.Open)
{
_trans.Rollback();
CurrentScopeId.Value = null;
_trans.Connection.Close();
}
_transAll.TryRemove(Id, out var _);
_trans.Dispose();
}
}
}
}
另外涉及到对仓储层底层类库的部分的修改,当需要执行具体的SQL语句之前,需要使用下面的代码获取当前数据库操作所属的事务对象(如果当前操作不在事务中,则会获取到null),并需要将此事务传递给负责具体SQL语句执行的Execute方法。
// 获取当前数据库操作所属的事务对象
var scope = Database.GetCurrentTransactionScope();
使用方式
当需要启用事务时:
using (var scope = Database.GetOrCreateTransactionScope())
// 如果十分确定此方法始终为最外层事务,不会被其他事务包裹,也可以使用 Database.CreateTransactionScope() 方法来生成 scope
{
await _order_repository.Add(orderEntity);
await _order_location_repository.Add(locationEntity);
scope.Complete();
}