.Net Core工作流WorkFlowCore

科技资讯 投稿 7000 0 评论

.Net Core工作流WorkFlowCore

前言

WorkFlowCore是一个针对.NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。

.Net7,此处不演示Jsonyaml配置,详细文档请查看官方文档和项目源码地址

 一、安装与基础使用

Install-Package WorkflowCore

然后注入WorkFlowCore

builder.Services.AddWorkflow(;

 WorkFlowCore主要分为两部分:步骤工作流

 步骤

 多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

public class FirstStepBody: StepBody { public override ExecutionResult Run(IStepExecutionContext context { Console.WriteLine("Hello world!First"; return ExecutionResult.Next(; } }

工作流

IWorkflow接口定义一个工作流,接口只有IdVersionBuild方法(内部可以执行多个步骤,工作流主机使用这些信息来标识工作流

public class MyWorkflow :IWorkflow { public string Id => "HelloWorld"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder { builder .StartWith<FirstStepBody>( .Then<FirstStepBody>(; } }

工作流如果想使用必须在工作流主机中通过RegisterWorkflow(方法注册,并且通过Start(方法启动主机,当然也可以通过Stop(方法停止工作流。执行工作流需要使用StartWorkflow(方法,参数为工作流类的Id,如下

[ApiController] [Route("[controller]"] public class WeatherForecastController : ControllerBase { private readonly IWorkflowHost _workflowHost; public WeatherForecastController(IWorkflowHost workflowHost { _workflowHost = workflowHost; } [HttpGet(Name = "get"] public ContentResult Get( { if (!_workflowHost.Registry.IsRegistered("HelloWorld",1 { _workflowHost.RegisterWorkflow<MyWorkflow>(; } _workflowHost.Start(; _workflowHost.StartWorkflow("HelloWorld"; //host.Stop(; return Content("ok"; } }

 当然也可以在构建web服务的时候统一注册,然后就可以直接执行啦

var host = app.Services.GetService<IWorkflowHost>(; host.RegisterWorkflow<MyWorkflow>(; host.Start(;

二、在步骤之间传递参数

每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

//步骤包含属性,并且计算
    public class FirstStepBody: StepBody
    {
        public int Input1 { get; set; }
        public int Input2 { get; set; }
        public int Output { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context
        {
            Output = Input1 + Input2;
            Console.WriteLine(Output;
            return ExecutionResult.Next(;
        }
    }
    //工作流包含输入输出的赋值
    public class MyWorkflow :IWorkflow<MyDataClass>
    {
        public string Id => "HelloWorld";
        public int Version => 1;
        public void Build(IWorkflowBuilder<MyDataClass> builder
        {
            builder
                .StartWith<FirstStepBody>(
                .Input(step => step.Input1,data => data.Value1
                .Input(step => step.Input2, data => 100
                .Output(data => data.Answer, step => step.Output
                .Then<FirstStepBody>(
                .Input(step => step.Input1, data => data.Value1
                .Input(step => step.Input2, data => data.Answer
                .Output(data => data.Answer, step => step.Output;
        }
    }
    //工作流的属性类
    public class MyDataClass
    {
        public int Value1 { get; set; }
        public int Value2 { get; set; }
        public int Answer { get; set; }
    }
    //执行工作流传入参数
    MyDataClass myDataClass = new MyDataClass(;
    myDataClass.Value1 = 100;
    myDataClass.Value2 = 200;
    //不传入myDataClass则每次执行都是新的数据对象
    _workflowHost.StartWorkflow("HelloWorld", myDataClass;

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值,可以用Input方法传入,Output方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在StartWorkflow方法中不传myDataClass,运行结果是100100,否则是200300

三、外部事件

WaitFor方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

public class MyWorkflow :IWorkflow<MyDataClass> { //省略。 public void Build(IWorkflowBuilder<MyDataClass> builder { builder .StartWith<FirstStepBody>( .Input(step => step.Input1,data => data.Value1 .Input(step => step.Input2, data => 100 .Output(data => data.Answer, step => step.Output .WaitFor("MyEvent",key => "EventKey" .Output(data => data.Answer,step => step.EventData .Then<FirstStepBody>( .Input(step => step.Input1, data => data.Value1 .Input(step => step.Input2, data => data.Answer .Output(data => data.Answer, step => step.Output; } } //。 [HttpGet(Name = "get"] public ContentResult Get( { MyDataClass myDataClass = new MyDataClass(; myDataClass.Value1 = 100; myDataClass.Value2 = 200; _workflowHost.StartWorkflow("HelloWorld", myDataClass; return Content("ok"; } [HttpPost(Name = "event"] public ContentResult PublishEvent( { _workflowHost.PublishEvent("MyEvent", "EventKey", 200; return Content("ok"; }

 使用WaitFor方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发PublishEvent执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

 

  可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1

四、活动

活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

activity-1,直到活动完成才继续工作流。它还将data.Value1的值传递给活动,然后将活动的结果映射到data.Value2

worker来处理活动项的队列。它使用GetPendingActivity方法来获取工作流正在等待的活动和数据。

//..... builder .StartWith<FirstStepBody>( .Input(step => step.Input1,data => data.Value1 .Input(step => step.Input2, data => 100 .Output(data => data.Answer, step => step.Output .Activity("activity-1", (data => data.Value1 .Output(data => data.Value2, step => step.Result .Then<FirstStepBody>( .Input(step => step.Input1, data => data.Value1 .Input(step => step.Input2, data => data.Answer .Output(data => data.Answer, step => step.Output; //.... [HttpPost(Name = "active"] public ContentResult PublishEvent( { var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result; if (activity != null { Console.WriteLine(activity.Parameters; _workflowHost.SubmitActivitySuccess(activity.Token, 100; } return Content("ok"; }

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

    public void Build(IWorkflowBuilder<object> builder
    {
        builder                
            .StartWith<HelloWorld>(
                .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10
            .Then<GoodbyeWorld>(;
    }

六、流程控制

工作流的流程控制包括分支、循环等各种操作

决策分支

使用IWorkflowBuilderCreateBranch方法定义分支。然后我们可以使用branch方法选择一个分支。

branch方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

data.Value1的值为1,则此工作流将选择branch2,如果为2,则选择branch3

var branch2 = builder.CreateBranch( .StartWith<PrintMessage>( .Input(step => step.Message, data => "hi from 1" .Then<PrintMessage>( .Input(step => step.Message, data => "bye from 1"; var branch3 = builder.CreateBranch( .StartWith<PrintMessage>( .Input(step => step.Message, data => "hi from 2" .Then<PrintMessage>( .Input(step => step.Message, data => "bye from 2"; builder .StartWith<HelloWorld>( .Decide(data => data.Value1 .Branch((data, outcome => data.Value1 == "one", branch2 .Branch((data, outcome => data.Value1 == "two", branch3;

并行ForEach

ForEach方法启动并行for循环

public class ForEachWorkflow : IWorkflow { public string Id => "Foreach"; public int Version => 1; public void Build(IWorkflowBuilder<object> builder { builder .StartWith<SayHello>( .ForEach(data => new List<int>( { 1, 2, 3, 4 } .Do(x => x .StartWith<DisplayContext>( .Input(step => step.Message, (data, context => context.Item .Then<DoSomething>( .Then<SayGoodbye>(; } }

While循环

While方法启动while循环

public class WhileWorkflow : IWorkflow<MyData> { public string Id => "While"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder { builder .StartWith<SayHello>( .While(data => data.Counter < 3 .Do(x => x .StartWith<DoSomething>( .Then<IncrementStep>( .Input(step => step.Value1, data => data.Counter .Output(data => data.Counter, step => step.Value2 .Then<SayGoodbye>(; } }

If判断

If方法执行if判断

public class IfWorkflow : IWorkflow<MyData> { public void Build(IWorkflowBuilder<MyData> builder { builder .StartWith<SayHello>( .If(data => data.Counter < 3.Do(then => then .StartWith<PrintMessage>( .Input(step => step.Message, data => "Value is less than 3" .If(data => data.Counter < 5.Do(then => then .StartWith<PrintMessage>( .Input(step => step.Message, data => "Value is less than 5" .Then<SayGoodbye>(; } }

并行

Parallel方法并行执行任务

public class ParallelWorkflow : IWorkflow<MyData> { public string Id => "parallel-sample"; public int Version => 1; public void Build(IWorkflowBuilder<MyData> builder { builder .StartWith<SayHello>( .Parallel( .Do(then => then.StartWith<Task1dot1>( .Then<Task1dot2>( .Do(then => then.StartWith<Task2dot1>( .Then<Task2dot2>( .Join( .Then<SayGoodbye>(; } }

Schedule

Schedule方法在工作流中注册在指定时间后执行的异步方法

builder .StartWith(context => Console.WriteLine("Hello" .Schedule(data => TimeSpan.FromSeconds(5.Do(schedule => schedule .StartWith(context => Console.WriteLine("Doing scheduled tasks" .Then(context => Console.WriteLine("Doing normal tasks";

Recur

Recure方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

builder .StartWith(context => Console.WriteLine("Hello" .Recur(data => TimeSpan.FromSeconds(5, data => data.Counter > 5.Do(recur => recur .StartWith(context => Console.WriteLine("Doing recurring task" .Then(context => Console.WriteLine("Carry on";

七、Saga transaction 

saga允许在saga transaction中封装一系列步骤,并为每一个步骤提供补偿步骤,使用CompensateWith方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

Task2如果抛出一个异常,那么补偿步骤UndoTask2UndoTask1将被触发。

builder .StartWith(context => Console.WriteLine("Begin" .Saga(saga => saga .StartWith<Task1>( .CompensateWith<UndoTask1>( .Then<Task2>( .CompensateWith<UndoTask2>( .Then<Task3>( .CompensateWith<UndoTask3>( .CompensateWith<CleanUp>( .Then(context => Console.WriteLine("End";

也可以指定重试策略,在指定时间间隔后重试。

builder .StartWith(context => Console.WriteLine("Begin" .Saga(saga => saga .StartWith<Task1>( .CompensateWith<UndoTask1>( .Then<Task2>( .CompensateWith<UndoTask2>( .Then<Task3>( .CompensateWith<UndoTask3>( .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5 .Then(context => Console.WriteLine("End";

八、持久化

可以使用RedisMongdbSqlserver等持久化,具体可以看文档,此处使用Redis,先安装nuget

Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

builder.Services.AddWorkflow(cfg => { cfg.UseRedisPersistence("localhost:6379", "app-name"; cfg.UseRedisLocking("localhost:6379"; cfg.UseRedisQueues("localhost:6379", "app-name"; cfg.UseRedisEventHub("localhost:6379", "channel-name"; //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow"; //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200", "workflows"; };

运行打开可以看到

 

编程笔记 » .Net Core工作流WorkFlowCore

赞同 (36) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽