WorkflowCore开源轻量级工作流介绍

科技资讯 投稿 61300 0 评论

WorkflowCore开源轻量级工作流介绍

  • 轻量级,部署和使用都很简单。

  • 有相当数量的用户,往往使用的人越多,产品也就越可靠,遇到问题也容易找到解决办法。

  • 支持使用配置文件定义工作流,而不仅仅是使用代码定义。

符合上述要求的开源项目有几个,这里介绍开源项目WorkflowCore,项目地址:https://github.com/danielgerlag/workflow-core。
本文的示例可以从github下载:https://github.com/zhenl/ZL.WorflowCoreDemo 。

简单的控制台项目

  • WorkflowCore

  • Microsoft.Extensions.DependencyInjection

  • Microsoft.Extensions.Logging

然后,创建两个工作流的步骤:

using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class HelloWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context
        {
            Console.WriteLine("你好";
            return ExecutionResult.Next(;
        }
    }
}
using WorkflowCore.Interface;
using WorkflowCore.Models;

namespace WorkflowCoreTest
{
    public class GoodbyeWorld : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context
        {
            Console.WriteLine("再见";
            return ExecutionResult.Next(;
        }
    }
}

接下来使用这两个步骤定义一个工作流:

using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    public class HelloWorldWorkflow : IWorkflow
    {
        public string Id => "HelloWorld";
        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder
        {
            builder
                .StartWith<HelloWorld>(
                .Then<GoodbyeWorld>(;
        }
    }
}

最后,在主程序中,创建WorkflowHost,注册并运行工作流,代码如下:

using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading;
using WorkflowCore.Interface;

namespace WorkflowCoreTest
{
    class Program
    {
        static void Main(string[] args
        {
            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<HelloWorldWorkflow>(;
            host.Start(;

            host.StartWorkflow("HelloWorld", 1, null;
            Console.ReadLine(;
            host.Stop(;
        }

        private static IServiceProvider ConfigureServices(
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection(;
            services.AddLogging(;
            services.AddWorkflow(;
                        
            var serviceProvider = services.BuildServiceProvider(;

            return serviceProvider;
        }
    }
}

简单的工作流就完成了。

WorkflowHost

static void Main(string[] args
        {
            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
                       
            host.RegisterWorkflow<HelloWorldWorkflow>(;
            host.Start(;
            host.StartWorkflow("HelloWorld", 1, null;
            
            Console.ReadLine(;
            host.Stop(;
        }

WorkflowHost的工作过程是这样的,首先需要获取WorkflowHost的实例,然后注册工作流,这里可以注册多个工作流,接下来,启动host,然后可以启动工作流,这里可以启动多个工作流实例,最后,关闭host。

(获得的host是否是同一对象?为了回答这个问题,我们增加一些代码:

            var host = serviceProvider.GetService<IWorkflowHost>(;
            var host1 = serviceProvider.GetService<IWorkflowHost>(;

            Console.WriteLine(host == host1;

我们获取两个host变量比较一下看是否指向相同的对象,结果是True,也就是使用serviceProvider.GetService<IWorkflowHost(获得的是相同的对象。

我们修改一下代码,启动流程实例后,马上执行host.Stop(:

            host.RegisterWorkflow<HelloWorldWorkflow>(;
            host.Start(;
            host.StartWorkflow("HelloWorld", 1, null;
            host.Stop(;
            Console.ReadLine(;

我们发现,没有输出结果,也就是host.Stop(终止了所有流程。
第三个问题,host中启动的流程是否在同一线程运行?
我们启动多个流程,看一下输出结果:

            host.RegisterWorkflow<HelloWorldWorkflow>(;
            host.Start(;
            host.StartWorkflow("HelloWorld", 1, null;
            host.StartWorkflow("HelloWorld", 1, null;
            host.StartWorkflow("HelloWorld", 1, null;
            host.Stop(;
            Console.ReadLine(;

下一步我们需要了解流程的参数传递。

流程的数据对象和数据传递

  • 定义一个数据结构用来保存输入的名字

  • 将这个数据结构与流程关联起来

  • 修改流程,让流程等待用户输入

  • 将用户输入的变量传递给流程
    首先我们定义一个简单的类,用来保存输入的名字:

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class MyNameClass
    {
        public string MyName { get; set; }
    }
}

然后,修改流程的定义:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            builder
                .StartWith(context => ExecutionResult.Next(
                .WaitFor("MyEvent", (data, context => context.Workflow.Id, data => DateTime.Now
                    .Output(data => data.MyName, step => step.EventData
                .Then<HelloWithName>(
                    .Input(step => step.Name, data => data.MyName
                .Then<GoodbyeWithName>(
                    .Input(step => step.Name, data => data.MyName;
        }
    }
}

这里,流程声明为 IWorkflow,说明流程使用这个类存储数据,在流程定义中,可以使用data操作相关的数据对象,比如: .Input(step => step.Name, data => data.MyName 就是将流程数据中的MyName传递给步骤中的Name(step.Name。

还需要修改两个步骤,增加名称字段:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class HelloWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context
        {
            Console.WriteLine("你好," + Name;
            return ExecutionResult.Next(;
        }
    }
}
using System;
using WorkflowCore.Interface;
using WorkflowCore.Models;


namespace ZL.WorflowCoreDemo.InputDataToStep.Steps
{
    public class GoodbyeWithName : StepBody
    {
        public string Name { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context
        {
            Console.WriteLine(Name + ",再见";
            return ExecutionResult.Next(;
        }
    }
}

下面是流程注册和运行的代码:

using System;

using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "HelloWithNameWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            builder
                .StartWith(context => ExecutionResult.Next(
                .WaitFor("MyEvent", (data, context => context.Workflow.Id, data => DateTime.Now
                    .Output(data => data.MyName, step => step.EventData
                .Then<HelloWithName>(
                    .Input(step => step.Name, data => data.MyName
                .Then<GoodbyeWithName>(
                    .Input(step => step.Name, data => data.MyName;
        }
    }
}
using System;
using System.Collections.Generic;

using System.Threading;
using Microsoft.Extensions.DependencyInjection;
using WorkflowCore.Interface;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class FlowRun
    {
        public static void Run(
        {
            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            
            host.RegisterWorkflow<HelloWithNameWorkflow, MyNameClass>(;
            host.Start(;

            var initialData = new MyNameClass(;
            var workflowId = host.StartWorkflow("HelloWithNameWorkflow", 1, initialData.Result;
            
            Console.WriteLine("输入名字";
            string value = Console.ReadLine(;
            host.PublishEvent("MyEvent", workflowId, value;

            Console.ReadLine(;
            host.Stop(;
        }

        private static IServiceProvider ConfigureServices(
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection(;
            services.AddLogging(;
            services.AddWorkflow(;

            var serviceProvider = services.BuildServiceProvider(;

            return serviceProvider;
        }
    }
}

我们也可以使用字典作为数据对象,流程的定义如下:

using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.InputDataToStep
{
    public class HelloWithNameWorkflowDynamic : IWorkflow<Dictionary<string,string>>
    {
        public string Id => "HelloWithNameWorkflowDynamic";
        public int Version => 1;

        public void Build(IWorkflowBuilder<Dictionary<string, string>> builder
        {
            builder
                .StartWith(context => ExecutionResult.Next(
                .WaitFor("MyEvent", (data, context => context.Workflow.Id, data => DateTime.Now
                    .Output((step,data=>data.Add("Name",(stringstep.EventData
                .Then<HelloWithName>(
                    .Input(step => step.Name, data => data["Name"]
                .Then<GoodbyeWithName>(
                    .Input(step => step.Name, data => data["Name"];
        }
    }
}

这里没有使用自定义的类,而是使用了字典Dictionary<string, string>,流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>(;
            host.Start(;

            var initialData = new Dictionary<string,string>(;
            var workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData.Result;
            
            Console.WriteLine("输入名字";
            string value = Console.ReadLine(;
            host.PublishEvent("MyEvent", workflowId, value;

            
            Console.ReadLine(;
            foreach (var key in initialData.Keys
            {
                Console.WriteLine(key + ":" + initialData[key];
            }
            Console.ReadLine(;
            host.Stop(;

采用JSON格式定义流程

WorkflowCore 支持采用JSON或者YAML格式定义流程,使用时通过使用IDefintionLoader加载流程来替代RegisterWorkflow。我们仍然通过简单的例子来说明。在我们现有的工程中已经定义了几个简单的流程步骤,我们用JSON格式将这几个步骤组成简单的工作流。

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      
    }
  ]
}

Json定义格式符合WorkflowCore的DSL,这里不进行DSL的详细介绍,我们重点关注流程如何定义,加载和运行。
我们可以将前面项目中的代码拷贝过来进行修改,首先修改下面的函数:

private static IServiceProvider ConfigureServices(
        {
            //setup dependency injection
            IServiceCollection services = new ServiceCollection(;
            services.AddLogging(;
            services.AddWorkflow(;
            //这是新增加的服务
            services.AddWorkflowDSL(;

            var serviceProvider = services.BuildServiceProvider(;

            return serviceProvider;
        }

ConfigureServices新增加了services.AddWorkflowDSL(;
在主函数中,使用IDefintionLoader加载JSON格式的流程定义:

static void Main(string[] args
        {
            IServiceProvider serviceProvider = ConfigureServices(;

            var loader = serviceProvider.GetService<IDefinitionLoader>(;

            var json = System.IO.File.ReadAllText("myflow.json";
            loader.LoadDefinition(json, Deserializers.Json;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.Start(;
            host.StartWorkflow("HelloWorld", 1, null;
            
            Console.ReadLine(;
            host.Stop(;
        }

现在,流程可以运行了。

JSON格式(DSL定义流程与使用Fluent API定义流程的比较

前面我们分别讨论了使用Fluent API定义流程和使用JSON格式定义流程,按照以前的使用经验,感觉这两种定义方式应该可以互相转换,互相代替,但在实际应用中发现并不是如此,两种方式都有不能被替代的功能。

使用Fluent API可以使用Lambda 表达式定义步骤

public class HelloWorldWorkflow : IWorkflow
{
    public string Id => "HelloWorld";
    public int Version => 1;

    public void Build(IWorkflowBuilder<object> builder
    {
        builder
            .StartWith(context =>
            {
                Console.WriteLine("你好";
                return ExecutionResult.Next(;
            }
            .Then(context =>
            {
                Console.WriteLine("再见";
                return ExecutionResult.Next(;
            };
    }
}

这种方式无法使用JSON等格式实现。

采用JSON等DSL格式可以方便地定义步骤间的跳转

{
  "Id": "HelloWorld",
  "Version": 1,
  "Steps": [
    {
      "Id": "Hello",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.HelloWorld,ZL.WorflowCoreDemo",
      "NextStepId": "Bye"
    },
    {
      "Id": "Bye",
      "StepType": "ZL.WorflowCoreDemo.Basic.Steps.GoodbyeWorld,ZL.WorflowCoreDemo"
      "NextStepId": "Hello"
    }
  ]
}

步骤“Hello”执行完成后,执行"Bye",“Bye”执行完又回到“Hello”,如此循环。但在Fluent API中就没有这么方便,必须使用循环或者其它的方式。而这种跳转方式在实际应用中非常常见,最常见的场景就是审批流程中的提交/驳回,提交-驳回过程可以形成多次循环,这种流程模式,采用带有步骤标记的跳转很容易实现。

流程数据类的局限性

下面的代码展示通过Lamdba表达式实现两个Dictionary<string,string>之间的数据传递,但在DSL中没有对应的方式:

                    .Output((step, data=> {
                        var dic = step.EventData as Dictionary<string, object>;
                        foreach (var key in dic.Keys
                        {
                            if (data.MyDic.ContainsKey(key data.MyDic[key] = dic[key];
                            else data.MyDic.Add(key, dic[key];
                        }

而在实际应用中,我们需要使用流程定义文件而不是写死的代码来定义流程,这样在流程修改时,就不需要修改代码和重新编译部署。这个限制是WorkflowCore在实际项目中落地的一个主要障碍。

工作流持久化与恢复

首先,我们需要使用NuGet引入SqlServer持久化Provider:WorkflowCore.Persistence.SqlServer,当然也可以使用其它类型的数据存储。

services.AddWorkflow(x => x.UseSqlServer(@"Server=.;Database=WorkflowCore;Trusted_Connection=True;", true, true;

最后修改一下执行代码,增加流程Id输入和恢复代码:

IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            
            host.RegisterWorkflow<HelloWithNameWorkflowDynamic, Dictionary<string,string>>(;
            host.Start(;

            var initialData = new Dictionary<string,string>(;

            Console.WriteLine("请输入需要恢复的流程编号,如执行新流程直接回车:";
            string workflowId = Console.ReadLine(;
            
            if (string.IsNullOrEmpty(workflowId
            {
                workflowId = host.StartWorkflow("HelloWithNameWorkflowDynamic", 1, initialData.Result;
                Console.WriteLine(workflowId;
            }
            else
            {
                host.ResumeWorkflow(workflowId;
            }
              

            
            Console.WriteLine("输入名字";
            string value = Console.ReadLine(;
            host.PublishEvent("MyEvent", workflowId, value;

下面,我们模拟中断-恢复过程。首先,运行程序,不输入流程id,直接按回车,会生成新的流程,并输出流程Id,拷贝这个流程ID,并退出程序:

单元测试

在ZL.WorkflowCoreDemo解决方案中增加一个xUnit测试项目,命名为ZL.WorkflowCoreDemo.Test,创建好的项目中已经包含xunit和xunit.runner.visualstudio。我们还需要使用NuGet引入其它的框架,首先要引入FluentAssertions,这个框架结合xUnit,可以让 我们在测试中使用Should断言。还需要引入WorkflowCore和WorkflowCore.Testing以及我们需要测试的项目。这里我们测试最简单的HelloWorldWorkflow。

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest(
        {
            Setup(;
        }

        [Fact]
        public void Test1(
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data;
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30;

            WorkflowStatus status = GetStatus(workflowId;
            status.Should(.Be(WorkflowStatus.Complete;
            UnhandledStepErrors.Count.Should(.Be(0;
           
        }

    }
}

需要注意的是在测试类的构造函数中调用Setup(,用来初始化流程引擎。

using System;
using Xunit;
using WorkflowCore.Testing;
using ZL.WorflowCoreDemo.Basic;
using WorkflowCore.Models;
using System.Threading;
using FluentAssertions;

namespace ZL.WorkflowCoreDemo.Test
{
    public class DemoUnitTest:WorkflowTest<HelloWorldWorkflow,dynamic>
    {
        public DemoUnitTest(
        {
            Setup(;
        }

        [Fact]
        public void Test1(
        {
            dynamic data = new { };
            var workflowId = StartWorkflow(data;
            WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30;

            WorkflowStatus status = GetStatus(workflowId;
            status.Should(.Be(WorkflowStatus.Complete;
            UnhandledStepErrors.Count.Should(.Be(0;
           
        }

        protected new WorkflowStatus GetStatus(string workflowId
        {
            var instance = PersistenceProvider.GetWorkflowInstance(workflowId.Result;
            return instance.Status;
        }

        protected new void WaitForWorkflowToComplete(string workflowId, TimeSpan timeOut
        {
            var status = GetStatus(workflowId;
            var counter = 0;
            while ((status == WorkflowStatus.Runnable && (counter < (timeOut.TotalMilliseconds / 100
            {
                Thread.Sleep(100;
                counter++;
                status = GetStatus(workflowId;
            }
        }
    }
}

再次运行,测试通过了。

Activity Workers

using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ActivityWorker
{
    public class MyActivityWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "MyActivityWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {

            builder
                .StartWith<HelloWithName>(.Input(data => data.Name, step => step.MyName
                    .Activity("activity-1", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result
                    .Then<GoodbyeWithName>(
                        .Input(step => step.Name, data => data.MyName
                    .Activity("activity-2", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result
                     .Then<HelloWithName>(.Input(step => step.Name, data => data.MyName
                    .Then<GoodbyeWithName>(
                        .Input(step => step.Name, data => data.MyName;
        }
    }
}

这个例子很简单,使用了我们前面定义的两个步骤,HelloWithName和GoodbyeWithName,Activity在这里就是接收外部输入的Name。流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<MyActivityWorkflow, MyNameClass>(;

            host.Start(;

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("MyActivityWorkflow", 1, myClass;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;

            if (activity != null
            {
                Console.WriteLine("输入名字";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;
            }

            activity = host.GetPendingActivity("activity-2", "worker2", TimeSpan.FromMinutes(1.Result;

            if (activity != null
            {
                Console.WriteLine("输入名字";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;
            }

            Console.ReadLine(;
            host.Stop(;

工作流启动后,需要通过host.GetPendingActivity获取Activity,获取成功,就从外部获取数据,然后使用host.SubmitActivitySuccess提交数据。

WaitFor vs Activity

            var id1=host.StartWorkflow("MyActivityWorkflow", 1, myClass.Result;
            var id2 = host.StartWorkflow("MyActivityWorkflow", 1, myClass.Result;

             //上面两个实例中有相同的activity-1,无法知道这里获取的是哪一个实例的活动,         
            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;

WairFor事件发布时有工作流实例ID传入:

host.PublishEvent("MyEvent", workflowId, value;

没有上面的缺陷。

使用ForEach并行执行多个流程

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParalleWorkflow : IWorkflow
    {
        public string Id => "ParalleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder
        {
            builder
            .StartWith(context => { Console.WriteLine("开始"; ExecutionResult.Next(; }
            .ForEach(data => new List<string>( { "张三", "李四", "王五", "赵六" }
                .Do(x => x
                    .StartWith<HelloWithName>(
                        .Input(step => step.Name, (data, context => context.Item as string
                    .Then<GoodbyeWithName>(
                        .Input(step => step.Name, (data, context => context.Item as string
                    
            .Then(context => { Console.WriteLine("结束"; ExecutionResult.Next(; };
        }
    }
}

在这个例子里,我们没有定义相关的数据类,需要输入的人名作为ForEach中的循环变量,这些变量保存在context中,输入到相应的环节中。执行代码如下:

            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<ParalleWorkflow>(;

            host.Start(;
            host.StartWorkflow("ParalleWorkflow", 1, null;


            Console.ReadLine(;
            host.Stop(;

Parallel并行执行多个流程

前面我们提到了使用ForEach执行并行流程,这些流程的执行过程相同,不同的只是输入的参数。如果需要并行执行多个不同的流程,需要使用Parallel,示例代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;

namespace ZL.WorflowCoreDemo.Paralle
{
    public class ParallePathWorkflow : IWorkflow
    {
        public string Id => "ParallePathWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder
        {
            builder
            .StartWith(context => { Console.WriteLine("开始"; ExecutionResult.Next(; }
            .Parallel(
                .Do(then =>
                    then.StartWith(context=>{ Console.WriteLine("分支一开始"; ExecutionResult.Next(; }
                        .Then(context => { Console.WriteLine("分支一结束"; ExecutionResult.Next(; }
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二开始"; ExecutionResult.Next(; }
                        .Then(context => { Console.WriteLine("分支二结束"; ExecutionResult.Next(; }
                .Do(then =>
                    then.StartWith(context => { Console.WriteLine("分支二开始"; ExecutionResult.Next(; }
                        .Then(context => { Console.WriteLine("分支二结束"; ExecutionResult.Next(; }
            .Join(
            .Then(context => { Console.WriteLine("结束"; ExecutionResult.Next(; };
        }
    }
}

为了说明分支语句的构成,这个流程没有使用关联的数据类,也没有使用类定义步骤,全部使用Lambda表达式。Parallel的结构是分支的开始是Parallel(,结束是Join(,每个分支在Do语句中表示。流程的运行代码如下:

IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<ParallePathWorkflow>(;
            host.Start(;
            host.StartWorkflow("ParallePathWorkflow", 1, null;
            Console.ReadLine(;
            host.Stop(;

While循环

While循环会重复执行某些步骤,直到条件得到满足再继续执行下面的流程。使用While循环可以实现审批流程中的“提交/驳回”,如果审批没有通过,驳回重新输入,直到审批通过或者驳回次数到达上限。这里举一个简单的例子说明使用方法,结合前面提到的Activity,可以实现对输入进行判断,如果输入不满足要求,就重新输入。流程定义如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class WhileWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "WhileWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            builder
                .StartWith<HelloWithName>(
                    .Input(step => step.Name, data => data.MyName
                .While(data => data.MyName.Length < 3
                    .Do(x => x
                        .StartWith(context=> { Console.WriteLine("输入小于3个字符"; ExecutionResult.Next(; }
                        .Activity("activity-1", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result
                .Then<GoodbyeWithName>(
                   .Input(step => step.Name, data => data.MyName;
        }
    }
}

流程运行的代码如下:

            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<WhileWorkflow, MyNameClass>(;

            host.Start(;

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("WhileWorkflow", 1, myClass;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;

            
            while (activity != null
            {
                Console.WriteLine("输入大于3个字符的名字结束,小于3个字符的名字继续";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;
                activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;
            }
                        
            Console.ReadLine(;
            host.Stop(;

If判断

If判断比较简单,根据流程关联的数据对象中的值进行判断,如果条件满足执行相应的分支。需要注意的是没有else相关语句,如果需要实现相关逻辑,需要再次进行一次条件相反的判断。下面是简单的例子,仍然使用前面定义的数据类和步骤,输入采用Activity:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class IfWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "IfWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            builder
                .StartWith(context=> ExecutionResult.Next(
                .Activity("activity-1", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result    
                .If(data => data.MyName.Length < 3
                    .Do(then=>then
                        .StartWith(context => { Console.WriteLine("输入小于3个字符"; ExecutionResult.Next(; }
                .If(data => data.MyName.Length >= 3
                    .Do(then => then
                        .StartWith(context => { Console.WriteLine("输入大于等于3个字符"; ExecutionResult.Next(; }
                .Then<GoodbyeWithName>(
                   .Input(step => step.Name, data => data.MyName;
        }
    }
}

流程的运行代码如下:

            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<IfWorkflow, MyNameClass>(;

            host.Start(;

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("IfWorkflow", 1, myClass;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;


            if (activity != null
            {
                Console.WriteLine("输入名字";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;
                
            }

            Console.ReadLine(;
            host.Stop(;

条件分支Decision Branches

Decision Branches有点类似于switch语句,可以为每个条件创建一个分支,这些分支相对独立,根据不同的条件选择执行。如果使用Fluent API,可以使用CreateBranch方法创建分支,然后在流程中使用分支。为了说明问题,我们改造前面的If流程,使用Decision Branches实现相同的功能,流程定义的代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class DecisionWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "DecisionWorkflow";
        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            var branch1 = builder.CreateBranch(
                .StartWith(context => { Console.WriteLine("输入小于3个字符"; ExecutionResult.Next(; };
            var branch2 = builder.CreateBranch(
                .StartWith(context => { Console.WriteLine("输入大于等于3个字符"; ExecutionResult.Next(; };

            builder
                .StartWith(context => ExecutionResult.Next(
                .Activity("activity-1", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result
                .Decide(data => data.MyName.Length
                     .Branch((data, outcome => data.MyName.Length<3, branch1
                     .Branch((data, outcome => data.MyName.Length >= 3, branch2
                .Then<GoodbyeWithName>(
                   .Input(step => step.Name, data => data.MyName;
        }
    }
}

流程执行定义的代码如下:

            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;
            host.RegisterWorkflow<DecisionWorkflow, MyNameClass>(;

            host.Start(;

            var myClass = new MyNameClass { MyName = "张三" };

            host.StartWorkflow("DecisionWorkflow", 1, myClass;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;


            if (activity != null
            {
                Console.WriteLine("输入名字";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;
                
            }

            Console.ReadLine(;
            host.Stop(;

使用Schedule执行定时任务

WorkflowCore 提供了定时执行后台任务的功能,使用Schedule可以定义异步执行的任务,在工作流的后台执行。示例代码如下:

using System;
using WorkflowCore.Interface;


namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class ScheduleWorkflow : IWorkflow
    {
        public string Id => "ScheduleWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<object> builder
        {
            builder
                .StartWith(context => Console.WriteLine("开始"
                    .Schedule(data => TimeSpan.FromSeconds(5.Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("后台工作"
                .Then(context => Console.WriteLine("前台工作";
        }
    }
}

在上面的代码中,工作流开始后,定义了一个Schedule,这个任务在延时5秒后,启动一个后台流程。流程的执行代码如下:

           IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;

            host.RegisterWorkflow<ScheduleWorkflow>(;
            host.Start(;

            
            var workflowId = host.StartWorkflow("ScheduleWorkflow", 1, null.Result;

            Console.ReadLine(;
            host.Stop(;

流程的执行代码与前面的例子基本类似,执行结果如下:

使用Recur执行重复的后台任务

前面介绍的Schedule可以启动一个后台的定时任务,这个任务只执行一次。如果需要执行多次固定间隔的任务,可以使用Recur,当条件满足时任务不再执行。Recur的定义与Schedule类似,只是多了条件判断输入,流程定义的代码如下:

using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using ZL.WorflowCoreDemo.InputDataToStep;
using ZL.WorflowCoreDemo.InputDataToStep.Steps;

namespace ZL.WorflowCoreDemo.ControlStructures
{
    public class RecurWorkflow : IWorkflow<MyNameClass>
    {
        public string Id => "RecurWorkflow";

        public int Version => 1;

        public void Build(IWorkflowBuilder<MyNameClass> builder
        {
            builder
                .StartWith(context => Console.WriteLine("开始"
                    .Recur(data => TimeSpan.FromSeconds(5,data=>data.MyName.Length>5.Do(recur => recur
                    .StartWith<HelloWithName>(
                    .Input(step => step.Name, data => data.MyName
                .Then(context => Console.WriteLine("前台工作"
                .Activity("activity-1", (data => data.MyName
                        .Output(data => data.MyName, step => step.Result;
        }
    }
}

这流程稍微复杂一点,我们增加了使用Activity的输入,目的是看一下前台的输入等待是否会影响后台的进程运行,还有就是前台输入的数据,能否正确传递到后台,流程的运行代码如下:

            IServiceProvider serviceProvider = ConfigureServices(;
            var host = serviceProvider.GetService<IWorkflowHost>(;

            host.RegisterWorkflow<RecurWorkflow,MyNameClass>(;
            host.Start(;

            var myClass = new MyNameClass { MyName = "张三" };

            var workflowId = host.StartWorkflow("RecurWorkflow", 1, myClass.Result;

            var activity = host.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1.Result;


            if (activity != null
            {
                Console.WriteLine("输入名字";
                string value = Console.ReadLine(;
                host.SubmitActivitySuccess(activity.Token, value;

            }

            Console.ReadLine(;
            host.Stop(;

运行效果如下:

集成Elasticsearch

如果希望使用Elasticsearch索引工作流,需要在项目中安装WorkflowCore.Providers.Elasticsearch,使用NuGet安装这个插件,然后在services中进行设置:

using Nest;
...
services.AddWorkflow(cfg =>
{
    ...
    cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://localhost:9200", "index_name";
};

在代码中,通过依赖注入引入ISearchIndex,使用Search方法进行搜索:

Search(string terms, int skip, int take, params SearchFilter[] filters

检索的范围包括流程的定义、描述、状态等。如果流程相关的自定义数据类需要检索,数据类需要实现ISearchable接口。

异常处理

 var host = serviceProvider.GetService<IWorkflowHost>(;
 host.OnStepError += Host_OnStepError;

异常处理代码可以写在Host_OnStepError中:

private static void Host_OnStepError(WorkflowCore.Models.WorkflowInstance workflow, WorkflowCore.Models.WorkflowStep step, Exception exception
        {
            
        }

实际使用中的问题

到这里,我们介绍了WorkflowCore的使用,下面谈一下这个项目在实际使用时遇到一些问题。

  • 轻量级,部署和使用都很简单。项目本身满足这个条件,但对流程相关的查询功能很弱,如果需要增强,需要Elasticsearch的支持。部署和使用Elasticsearch带来了额外的工作量。

  • WorkflowCore支持使用JSON格式定义工作流,然而从功能上要弱于使用Fluent API定义的工作流,因为不具备解析Lambda表达式的能力

  • 参数传递功能相对较弱,无法传递复杂对象。
    上述问题是我们在实际中遇到的,希望对大家有所帮助。

编程笔记 » WorkflowCore开源轻量级工作流介绍

赞同 (45) or 分享 (0)
游客 发表我的评论   换个身份
取消评论

表情
(0)个小伙伴在吐槽