像 TransactionScope 一样优雅的使用数据库事务

—— 基于 AsyncLocal<T> 的事务实现

momo314相同方式共享非商业用途署名转载



背景:TransactionScope 优雅的事务代码

using (var scope = new TransactionScope())
{
  // ...
  scope.Complete();
}

一直感觉 TransactionScope 的事务代码比较优雅:

  1. using作用域 即 事务的作用域,可以明确的标识出事务的代码范围,可读性大大提高。
  2. Complete方法被调用后,事务才会提交。
  3. 相应的,Complete之前只要抛出了任何异常,则事务会自动回滚(因为Complete方法不会被调用)。

但目前异步大并发的场景下,想用好 TransactionScope 并不容易;且很多时候,我们并不涉及分布式事务,只是一个简单的数据库单机事务,使用 TransactionScope 有些不太值当。

所以今天我们使用AsyncLocal<T>来实现一个事务组件,以提供类似于 TransactionScope 的事务写法。

关于 AsyncLocal<T>

AsyncLocal 与 .NET Framework 中的 CallContext 类似,有两个特性:

  1. 提供与 ThreadLocal 类似的 线程本地 数据存储。
  2. 当线程发生切换时,可以将 线程本地 存储的数据复制到子线程。

基于 AsyncLocal<T> 的事务设计

数据库操作一般会被认为比较耗时,会被设计为异步方法,这就会造成:

  • 复杂的事务流程内涉及到多个数据库操作,这些数据库操作方法都是异步的。
  • 事务内的每个异步方法都大概率会发生线程切换。
  • 在事务提交之前,事务内的所有操作都需要被执行成功。

基于以上原因,我们的事务流程应该类似于:

  1. 从某个线程上使用 using 的方式开启一个事务,并将事务的id存储于 AsyncLocal 变量中
  2. 事务的内部的多个数据库操作方式自由使用同步或异步的方式
  3. 当SQL语句准备执行时,检查 AsyncLocal 变量中是否存在事务id,如果存在,则根据事务id找到具体的事务对象,并将此事务对象传入当前数据库连接
  4. 等待所有异步方法执行完成(或抛出异常)
  5. 执行Complete方法提交事务(若无异常)
  6. 当事务发生嵌套时,内层事务的Complete方法不提交事务,等待最外层的事务统一提交(考虑代码复用,同一个带事务的方法可以自己作为最外层事务单独使用,也可以作为其他事务的一部分嵌套使用)
  7. 在Dispose方法中移除 AsyncLocal 中保存的事务id,并销毁事务对象

代码如下(以mysql为例,sqlserver同理):

using MySql.Data.MySqlClient;
using System;
using System.Data;

namespace NIP.Infrastructure.Data.MySql.Core
{
    public class Database
    {
        private static string _defaultKey = "connstrings:database:default";

        /// <summary>
        /// 根据连接字符串的configkey获取一个数据库连接
        /// </summary>
        internal static MySqlConnection GetInstance(string connKey = null)
        {
            if (string.IsNullOrEmpty(connKey))
                connKey = _defaultKey;

            var connstr = ConfigurationManager.AppSettings[connKey];
            return new MySqlConnection(connstr);
        }

        /// <summary>
        /// 创建一个新的事务作用域
        /// </summary>
        /// <param name="connKey">链接字符串的 config key</param>
        /// <param name="level">事务隔离级别</param>
        /// <returns></returns>
        public static MySqlTransactionScope CreateTransactionScope(string connKey = default, IsolationLevel? level = default)
        {
            var conn = GetInstance(connKey);

            conn.Open();

            if (!level.HasValue && !timeout.HasValue)
            {
                var trans = conn.BeginTransaction();
                return new MySqlTransactionScope(trans);
            }

            if (level.HasValue && timeout.HasValue)
            {
                var trans = conn.BeginTransaction(level.Value);
                return new MySqlTransactionScope(trans);
            }

            if (level.HasValue)
            {
                var trans = conn.BeginTransaction(level.Value);
                return new MySqlTransactionScope(trans);
            }

            return new MySqlTransactionScope(conn.BeginTransaction());
        }

        /// <summary>
        /// 创建一个新的事务作用域 或 直接获取当前事务作用域(如果当前代码在一个事务作用域内部)
        /// 如果在一个事务作用域(B)内部调用此方法,则得到的事务作用域(A)会被认为是事务B的一部分,且A的Commit会被忽略
        /// </summary>
        /// <param name="connKey">链接字符串的 config key (仅创建时使用)</param>
        /// <param name="level">事务隔离级别 (仅创建时使用)</param>
        /// <returns></returns>
        public static MySqlTransactionScope GetOrCreateTransactionScope(string connKey = default, IsolationLevel? level = default)
        {
            var currentId = MySqlTransactionScope.CurrentScopeId.Value;
            if (currentId.HasValue)
            {
                var id = currentId.Value;
                var trans = MySqlTransactionScope.GetById(id);
                return new MySqlTransactionScope(id, trans);
            }
            return CreateTransactionScope(connKey, level);
        }

        /// <summary>
        /// 直接获取当前事务作用域(如果当前代码在一个事务作用域内部, 否则会返回null)
        /// 显式调用A的Commit会被忽略,只有B的Commit会生效
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        internal static MySqlTransactionScope GetCurrentTransactionScope()
        {
            var currentId = MySqlTransactionScope.CurrentScopeId.Value;
            if (currentId.HasValue)
            {
                var id = currentId.Value;
                var trans = MySqlTransactionScope.GetById(id);
                return new MySqlTransactionScope(id, trans, inner: false);
            }
            return null;
        }

        /// <summary>
        /// 根据指定的作用域(B)的id获取事务作用域(A),并使A成为被指定id的事务作用域(B)的一部分
        /// 显式调用A的Commit会被忽略,只有B的Commit会生效
        /// </summary>
        /// <param name="id"></param>
        /// <returns></returns>
        public static MySqlTransactionScope GetTransactionScope(Guid id)
        {
            var trans = MySqlTransactionScope.GetById(id);
            return new MySqlTransactionScope(id, trans);
        }
    }
}
using MySql.Data.MySqlClient;
using NIP.Infrastructure.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Data;
using System.Threading;

namespace NIP.Infrastructure.Data.MySql.Core
{
    public class MySqlTransactionScope : IDisposable
    {
        private static ConcurrentDictionary<Guid, MySqlTransaction> _transAll = new ConcurrentDictionary<Guid, MySqlTransaction>();
        private MySqlTransaction _trans;
        internal static AsyncLocal<Guid?> CurrentScopeId = new AsyncLocal<Guid?>();
        public Guid Id { get; private set; }
        // 标记当前事务是否为一个 子事务
        public bool _inner = false;


        internal MySqlTransactionScope(MySqlTransaction transaction)
        {
            Id = Guid.NewGuid();
            _trans = transaction;
            var succeed = _transAll.TryAdd(Id, _trans);
            if (!succeed)
            {
                var code = ExceptionCode.DATABASE_ERROR;
                var msg = "failed to create new transaction.";
                throw new ForeseenException(code, msg);
            }
            CurrentScopeId.Value = Id;
        }

        /// <summary>
        /// 构造函数
        /// inernal: 保证只有基础类库程序集内部可以调用此构造函数,用来生成一个 子事务
        /// </summary>
        /// <param name="id">id</param>
        /// <param name="transaction">transaction</param>
        /// <param name="inner">是否作为指定id的scope的一部分(子作用域)</param>
        internal MySqlTransactionScope(Guid id, MySqlTransaction transaction, bool inner = true)
        {
            Id = id;
            _trans = transaction;
            _inner = inner;
        }

        internal MySqlTransaction Current
        {
            get
            {
                return _trans;
            }
        }

        internal static MySqlTransaction GetById(Guid id)
        {
            var succeed = _transAll.TryGetValue(id, out var trans);
            if (!succeed)
            {
                var code = ExceptionCode.DATABASE_ERROR;
                var msg = $"transaction (id: {id.ToString().ToLower()}) not found.";
                throw new ForeseenException(code, msg);
            }
            return trans;
        }

        public void Complete()
        {
            // 如果为内层事务,则忽略Complete方法对事务的提交,等待最外层事务的Complete来提交事务
            if (!_inner)
            {
                _trans.Commit(); 
                CurrentScopeId.Value = null; 
                if (_trans.Connection.State == ConnectionState.Open)
                    _trans.Connection.Close();
            }
        }

        public void Dispose()
        {
            if (!_inner)
            {
                if (_trans.Connection.State == ConnectionState.Open)
                {
                    _trans.Rollback();
                    CurrentScopeId.Value = null;
                    _trans.Connection.Close();
                }
                _transAll.TryRemove(Id, out var _);
                _trans.Dispose();
            }
        }
    }
}

另外涉及到对仓储层底层类库的部分的修改,当需要执行具体的SQL语句之前,需要使用下面的代码获取当前数据库操作所属的事务对象(如果当前操作不在事务中,则会获取到null),并需要将此事务传递给负责具体SQL语句执行的Execute方法。

// 获取当前数据库操作所属的事务对象
var scope = Database.GetCurrentTransactionScope();

使用方式

当需要启用事务时:

using (var scope = Database.GetOrCreateTransactionScope())
// 如果十分确定此方法始终为最外层事务,不会被其他事务包裹,也可以使用 Database.CreateTransactionScope() 方法来生成 scope
{
  await _order_repository.Add(orderEntity);
  await _order_location_repository.Add(locationEntity);

  scope.Complete();
}
✎﹏ 本文来自于 momo314的神奇海螺 ,文章原创,转载请注明作者并保留原文链接。