.NetCore中使用分布式事务DTM的二阶段消息

科技资讯 投稿 6000 0 评论

.NetCore中使用分布式事务DTM的二阶段消息

一、概述

DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

RocketMQ等,DTM自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

 二阶段消息主要是指PrepareSubmit两个阶段,主程序向DTM服务发送Prepare消息,成功后执行本地事务,完成本地事务后发送Submit消息至DTM服务,之后DTM会调用分支事件执行其他服务,最后完成全局事务。

Prepare但是Submit没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

gid插入到barrier表中,同时带上插入原因为committed。该表有一个唯一索引,主要字段为gid

gid是否存在,而是再insert ignore一条带有相同gid的数据,同时带上插入原因为rollbacked。此时如果表中如果已有gid的记录,那么新的插入操作就会被ignore,否则数据会被插入。

gid查询表中的记录,如果查到记录的reasoncommitted,那么说明本地事务已提交;如果查到记录的reasonrollbacked,那么说明本地事务已回滚。

二、安装DTM

Window环境所以下载后解压,点击dtm.exe进行运行即可,如下启动成功

http://localhost:36789,进入管理后台

三、创建DTM所需的表

CREATE TABLE [dbo].[barrier]
(
    [id] bigint NOT NULL IDENTITY(1,1 PRIMARY KEY,
    [trans_type] varchar(45 NOT NULL DEFAULT('',
    [gid] varchar(128 NOT NULL DEFAULT('',
    [branch_id] varchar(128 NOT NULL DEFAULT('',
    [op] varchar(45 NOT NULL DEFAULT('',
    [barrier_id] varchar(45 NOT NULL DEFAULT('',
    [reason] varchar(45 NOT NULL DEFAULT('',
    [create_time] datetime NOT NULL DEFAULT(getdate( ,
    [update_time] datetime NOT NULL DEFAULT(getdate(


GO

CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
        ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC
WITH(IGNORE_DUP_KEY = ON

GO

这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON,这个其实就是为了等价mysqlinsert ignore表示存在相关字段的信息则不插入,否则就插入数据

 四、创建项目

 我们简单的创建两个.net core webapi项目进行测试,两个项目都进行相同的如下操作:

 1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer

Dtmcli是因为其中已经帮我们集成了DTM客户端SDK HTTP版本,想要GRPC版本可以安装Dtmgrpc

Microsoft.EntityFrameworkCore.SqlServer很显然是为了处理数据库。

Install-Package Dtmcli Install-Package Microsoft.EntityFrameworkCore.SqlServer

2、配置

appsetting.json中添加如下

"AppSettings": { "DtmUrl": "http://localhost:36789", "BusiUrl": "http://localhost:5056", "QueryPreparedUrl": "http://localhost:5046", "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True" }

 DtmUrlDTM的监听地址,http的是36789grpc的是36790

BusiUrl:访问其他服务的地址

QueryPreparedUrl:回查的地址

BarrierConn:数据库连接语句

    public class AppSettings
    {
        public string DtmUrl { get; set; }
        public string BusiUrl { get; set; }
        public string BarrierConn { get; set; }
        public string QueryPreparedUrl { get; set; }
    }

 之后注入服务如下:

builder.Services.AddDtmcli(dtm => { dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl"; dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER; dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]"; }; builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("AppSettings";

SqlDbType:表示使用的数据库类型

BarrierSqlTableName:Barrier表的名字

3、添加代码

    [ApiController]public class DtmController : ControllerBase
    {

        private readonly ILogger<DtmController> _logger;
        private readonly IDtmClient _dtmClient;
        private readonly IDtmTransFactory _transFactory;
        private readonly AppSettings _settings;
        private readonly IBranchBarrierFactory _factory;
        public DtmController(ILogger<DtmController> logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions<AppSettings> settings, IBranchBarrierFactory factory
        {
            _logger = logger;
            _dtmClient = dtmClient;
            _transFactory = transFactory;
            _settings = settings.Value;
            _factory = factory;
        }
        private DbConnection GetConn( => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn;
        [HttpPost("post-dtm-msg"]
        public async Task<IActionResult> Get(CancellationToken cancellationToken
        {
            //1、创建gid
            var gid = await _dtmClient.GenGid(cancellationToken;
            //2、设置分支事务
            var msg = _transFactory.NewMsg(gid
                .Add(_settings.BusiUrl + "/TransOut", new { id = 123 }
                .Add(_settings.BusiUrl + "/TransIn", new { id = 321 };//3、执行submit
            using (DbConnection conn = GetConn(
            {
                await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                {
                    //4、执行本地事务
                    await Task.CompletedTask;
                };
            }
            _logger.LogInformation("result gid is {0}", gid;
            return Content("SUCCESS";
        }
        [HttpGet("msg-queryprepared"]
        public async Task<IActionResult> QueryPrepared(CancellationToken cancellationToken
        {
            var bb = _factory.CreateBranchBarrier(Request.Query;
            _logger.LogInformation("bb {0}", bb;
            using (DbConnection conn = GetConn(
            {
                //回调查询消息状态
                var res = await bb.QueryPrepared(conn;
                return Ok(new { dtm_result = res };
            }
        }
    }

然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:

[ApiController] public class TransController : ControllerBase { private readonly ILogger<TransController> _logger; private readonly IBranchBarrierFactory _factory; private readonly AppSettings _settings; private DbConnection GetConn( => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn; public TransController(ILogger<TransController> logger, IBranchBarrierFactory factory, IOptions<AppSettings> settings { _logger = logger; _factory = factory; _settings = settings.Value; } [HttpPost("TransIn"] public async Task<IResult> In( { return Results.Ok(new { dtm_result = "SUCCESS" }; //return Results.Ok(new { dtm_result = "FAILURE" }; } [HttpPost("TransOut"] public async Task<IResult> Out( { return Results.Ok(new { dtm_result = "SUCCESS" }; } }

五、执行查看结果

我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录

rollback标记的消息

using (DbConnection conn = GetConn( { await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx => { throw new Exception("报错了"; //4、执行本地事务 await Task.CompletedTask; }; }

 如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。

 

编程笔记 » .NetCore中使用分布式事务DTM的二阶段消息

赞同 (30) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽