.Net Core工作流WorkFlowCore

这篇具有很好参考价值的文章主要介绍了.Net Core工作流WorkFlowCore。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

前言

WorkFlowCore是一个针对.NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。

本篇开发环境为.Net7,此处不演示Jsonyaml配置,详细文档请查看官方文档和项目源码地址

 一、安装与基础使用

通过以下命令安装

Install-Package WorkflowCore

然后注入WorkFlowCore

builder.Services.AddWorkflow();

 WorkFlowCore主要分为两部分:步骤工作流

 步骤

 多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

public class FirstStepBody: StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello world!First");
            return ExecutionResult.Next();
        }
    }

工作流

 通过继承IWorkflow接口定义一个工作流,接口只有IdVersionBuild方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

public class MyWorkflow :IWorkflow
    {
        public string Id => "HelloWorld";
        public int Version => 1;
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Then<FirstStepBody>();
        }
    }

工作流如果想使用必须在工作流主机中通过RegisterWorkflow()方法注册,并且通过Start()方法启动主机,当然也可以通过Stop()方法停止工作流。执行工作流需要使用StartWorkflow()方法,参数为工作流类的Id,如下

 [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private readonly IWorkflowHost _workflowHost;
        public WeatherForecastController(IWorkflowHost workflowHost)
        {
            _workflowHost = workflowHost;
        }
        [HttpGet(Name = "get")]
        public ContentResult Get()
        {
            if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
            {
                _workflowHost.RegisterWorkflow<MyWorkflow>();
            }
            _workflowHost.Start();
            _workflowHost.StartWorkflow("HelloWorld");
            //host.Stop();
            return Content("ok");
        }
    }

 当然也可以在构建web服务的时候统一注册,然后就可以直接执行啦

var host = app.Services.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyWorkflow>();
host.Start();

二、在步骤之间传递参数

每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

//步骤包含属性,并且计算
    public class FirstStepBody: StepBody
    {
        public int Input1 { get; set; }
        public int Input2 { get; set; }
        public int Output { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Output = Input1 + Input2;
            Console.WriteLine(Output);
            return ExecutionResult.Next();
        }
    }
    //工作流包含输入输出的赋值
    public class MyWorkflow :IWorkflow<MyDataClass>
    {
        public string Id => "HelloWorld";
        public int Version => 1;
        public void Build(IWorkflowBuilder<MyDataClass> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Input(step => step.Input1,data => data.Value1)
                .Input(step => step.Input2, data => 100)
                .Output(data => data.Answer, step => step.Output)
                .Then<FirstStepBody>()
                .Input(step => step.Input1, data => data.Value1)
                .Input(step => step.Input2, data => data.Answer)
                .Output(data => data.Answer, step => step.Output);
        }
    }
    //工作流的属性类
    public class MyDataClass
    {
        public int Value1 { get; set; }
        public int Value2 { get; set; }
        public int Answer { get; set; }
    }
    //执行工作流传入参数
    MyDataClass myDataClass = new MyDataClass();
    myDataClass.Value1 = 100;
    myDataClass.Value2 = 200;
    //不传入myDataClass则每次执行都是新的数据对象
    _workflowHost.StartWorkflow("HelloWorld", myDataClass);

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用Input方法传入,Output方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在StartWorkflow方法中不传myDataClass,运行结果是100100,否则是200300

三、外部事件

工作流可以使用WaitFor方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

public class MyWorkflow :IWorkflow<MyDataClass>
    {
        //省略。。。。
        public void Build(IWorkflowBuilder<MyDataClass> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Input(step => step.Input1,data => data.Value1)
                .Input(step => step.Input2, data => 100)
                .Output(data => data.Answer, step => step.Output)
                .WaitFor("MyEvent",key => "EventKey")
                .Output(data => data.Answer,step => step.EventData)
                .Then<FirstStepBody>()
                .Input(step => step.Input1, data => data.Value1)
                .Input(step => step.Input2, data => data.Answer)
                .Output(data => data.Answer, step => step.Output);
        }
    }
    //。。。
    [HttpGet(Name = "get")]
    public ContentResult Get()
    {
        MyDataClass myDataClass = new MyDataClass();
        myDataClass.Value1 = 100;
        myDataClass.Value2 = 200;
        _workflowHost.StartWorkflow("HelloWorld", myDataClass);
            return Content("ok");
        }
  [HttpPost(Name = "event")]
  public ContentResult PublishEvent()
  {
    _workflowHost.PublishEvent("MyEvent", "EventKey", 200);
    return Content("ok");
  }

 使用WaitFor方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发PublishEvent执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

 需要执行事件,工作流才会继续下一步,如下动图演示:

 .Net Core工作流WorkFlowCore

 文章来源地址https://www.toymoban.com/news/detail-412104.html

  可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))

四、活动

活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

在本例中,工作流将等待活动activity-1,直到活动完成才继续工作流。它还将data.Value1的值传递给活动,然后将活动的结果映射到data.Value2

然后我们创建一个worker来处理活动项的队列。它使用GetPendingActivity方法来获取工作流正在等待的活动和数据。

    //.....
    builder
    .StartWith<FirstStepBody>()
    .Input(step => step.Input1,data => data.Value1)
    .Input(step => step.Input2, data => 100)
    .Output(data => data.Answer, step => step.Output)
    .Activity("activity-1", (data) => data.Value1)
    .Output(data => data.Value2, step => step.Result)
    .Then<FirstStepBody>()
    .Input(step => step.Input1, data => data.Value1)
    .Input(step => step.Input2, data => data.Answer)
    .Output(data => data.Answer, step => step.Output);
    //....
    [HttpPost(Name = "active")]
   public ContentResult PublishEvent()
   {
    var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
    if (activity != null)
    {
      Console.WriteLine(activity.Parameters);
      _workflowHost.SubmitActivitySuccess(activity.Token, 100);
    }
    return Content("ok");
   }

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder                
            .StartWith<HelloWorld>()
                .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
            .Then<GoodbyeWorld>();
    }

六、流程控制

工作流的流程控制包括分支、循环等各种操作

决策分支

在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

使用IWorkflowBuilderCreateBranch方法定义分支。然后我们可以使用branch方法选择一个分支。

选择表达式将与通过branch方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

如果data.Value1的值为1,则此工作流将选择branch1,如果为2,则选择branch2

  var branch1 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 1")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 1");

  var branch2 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 2")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 2");
  builder
    .StartWith<HelloWorld>()
    .Decide(data => data.Value1)
        .Branch((data, outcome) => data.Value1 == "one", branch1)
        .Branch((data, outcome) => data.Value1 == "two", branch2);

并行ForEach

使用ForEach方法启动并行for循环

  public class ForEachWorkflow : IWorkflow
  {
      public string Id => "Foreach";
      public int Version => 1;
      public void Build(IWorkflowBuilder<object> builder)
      {
          builder
              .StartWith<SayHello>()
              .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                  .Do(x => x
                      .StartWith<DisplayContext>()
                          .Input(step => step.Message, (data, context) => context.Item)
                      .Then<DoSomething>())
              .Then<SayGoodbye>();
      }        
  }

While循环

使用While方法启动while循环

  public class WhileWorkflow : IWorkflow<MyData>
  {
      public string Id => "While";
      public int Version => 1;
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .While(data => data.Counter < 3)
                  .Do(x => x
                      .StartWith<DoSomething>()
                      .Then<IncrementStep>()
                          .Input(step => step.Value1, data => data.Counter)
                          .Output(data => data.Counter, step => step.Value2))
              .Then<SayGoodbye>();
      }        
  }

If判断

使用If方法执行if判断

  public class IfWorkflow : IWorkflow<MyData>
  { 
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .If(data => data.Counter < 3).Do(then => then
                  .StartWith<PrintMessage>()
                      .Input(step => step.Message, data => "Value is less than 3")
              )
              .If(data => data.Counter < 5).Do(then => then
                  .StartWith<PrintMessage>()
                      .Input(step => step.Message, data => "Value is less than 5")
              )
              .Then<SayGoodbye>();
      }        
  }

并行

使用Parallel方法并行执行任务

  public class ParallelWorkflow : IWorkflow<MyData>
  {
      public string Id => "parallel-sample";
      public int Version => 1;
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .Parallel()
                  .Do(then => 
                      then.StartWith<Task1dot1>()
                          .Then<Task1dot2>()
                  .Do(then =>
                      then.StartWith<Task2dot1>()
                          .Then<Task2dot2>()
              .Join()
              .Then<SayGoodbye>();
    }        
}

Schedule

使用Schedule方法在工作流中注册在指定时间后执行的异步方法

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
        .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
    )
    .Then(context => Console.WriteLine("Doing normal tasks"));

Recur

使用Recure方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
        .StartWith(context => Console.WriteLine("Doing recurring task"))
    )
    .Then(context => Console.WriteLine("Carry on"));

七、Saga transaction 

saga允许在saga transaction中封装一系列步骤,并为每一个步骤提供补偿步骤,使用CompensateWith方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

如下示例,步骤Task2如果抛出一个异常,那么补偿步骤UndoTask2UndoTask1将被触发。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
        .CompensateWith<CleanUp>()
    .Then(context => Console.WriteLine("End"));

也可以指定重试策略,在指定时间间隔后重试。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
    .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
    .Then(context => Console.WriteLine("End"));

八、持久化

可以使用RedisMongdbSqlserver等持久化,具体可以看文档,此处使用Redis,先安装nuget

Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

builder.Services.AddWorkflow(cfg =>
{
    cfg.UseRedisPersistence("localhost:6379", "app-name");
    cfg.UseRedisLocking("localhost:6379");
    cfg.UseRedisQueues("localhost:6379", "app-name");
    cfg.UseRedisEventHub("localhost:6379", "channel-name");
    //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
    //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
});

运行打开可以看到

.Net Core工作流WorkFlowCore

 

到了这里,关于.Net Core工作流WorkFlowCore的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 云原生离线工作流编排利器 -- 分布式工作流 Argo 集群

    作者:庄宇 在现代的软件开发和数据处理领域,批处理作业(Batch)扮演着重要的角色。它们通常用于数据处理,仿真计算,科学计算等领域,往往需要大规模的计算资源。随着云计算的兴起,阿里云批量计算和 AWS Batch 等云服务提供了管理和运行这些批处理作业的平台。 随

    2024年01月24日
    浏览(84)
  • 半天就行!教你用ChatGPT开发小程序;谁能做出中国的Discord?LangChain中文入门教程;一个周末搞定电影预告片的AI工作流 | ShowMeAI日报

    👀 日报周刊合集 | 🎡 生产力工具与行业应用大全 | 🧡 点赞关注评论拜托啦! ShowMeAI知识星球资料分类「下资料」,编号「R080」 Discord 不仅是口碑最好的游戏通信产品,还是世界上规模最大、发展最快的社交网络。在最新AI浪潮中 Discord 又迎来了新的增量。 Discord 也是非常

    2024年02月11日
    浏览(37)
  • Camunda 7工作流引擎 API 以及与Springboot集成实现工作流配置全纪录

    项目中需要用到工作流引擎来设计部分业务流程,框架选型最终选择了 Camunda7,关于 Camunda以及 Activity 等其他工作流 引擎的介绍及对比不再介绍,这里只介绍与现有Springboot项目的集成以及具体使用及配置 流程(PROCESS): 通过工具建模最终生成的BPMN文件,里面有整个流程的定

    2024年02月10日
    浏览(58)
  • 云计算工作流调度

    阅读笔记 首先,我们提出了一个更实用的混合云服务流程成本驱动调度模型,该模型在不降低VM部署弹性的情况下更精确地定义资源约束,并考虑了基于间隔的综合收费,包括计费周期和持续使用折扣。 其次,提出了一种改进的基于FWA(烟花算法)的方法来解决这一问题。在

    2024年02月02日
    浏览(71)
  • Activiti 工作流简介

    1、什么是工作流         工作流(Workflow),就是通过计算机对业务流程自动化执行管理。它主要解决的是“使在多个参与者之间按照某种预定义的规则自动进行传递文档、信息或任务的过程,从而实现某个预期的业务目标,或者促使此目标的实现”。 1.2、工作流系统   

    2024年02月04日
    浏览(51)
  • Docker工作流

    开发应用 编写Dockerfile 构建Docker镜像 运行Docker容器 测试应用 发布镜像到Hub 迭代更新镜像 首先你需要创建一个应用,这个应用可以是后端应用或者前端应用,任何语言都可以。 比如:我使用IDEA 创建一个Java后端应用,基于Maven构建,工程结构如下: 基于自己的工程来编写

    2024年04月29日
    浏览(36)
  • Git 工作流设计

    前言 常用的工作流有四种 集中式工作流 功能分支流 git flow 工作流 forking 工作流 集中式工作流 集中式工作流,多个功能(feat),bug修复(fix) 在一个分支上开发,极容易出现代码从冲突 功能分支流 新的功能或者bug fork出一个新的分支,在该分支上开发 功能在分支开发完后再合

    2024年02月05日
    浏览(66)
  • GitFlow工作流

    基于 Git 这一版本控制系统,通过定义不同的分支,探索合适的工作流程来完成开发、测试、修改等方面的需求。 例如:在开发阶段,创建 feature 分支,完成需求后,将此分支合并到 develop 分支上;在发布阶段,创建 release 分支,完成阶段开发任务后,将分支合并到 develop 和

    2024年02月22日
    浏览(41)
  • Git工作流

    main:生产环境,也就是你们在网上可以下载到的版本,是经过了很多轮测试得到的稳定版本。 release: 开发内部发版,也就是测试环境。 dev:所有的feature都要从dev上checkout。 feature:每个需求新创建的分支。 下面介绍一下一个新需求过来的git操作流程: 1.从dev分支上checkou

    2024年02月10日
    浏览(44)
  • 工作流引擎Flowable

    官方手册 一、依赖 二、demo 三、日志文件 在resources中添加日志文件log4j.properties Flowable流程图 Eclipse Designer, 一款Eclipse插件, 用于图形化建模, 测试与部署BPMN2.0流程 FlowableUI Flowable BPMN visualizer, 一款idea插件 从官网下载flowable-6.7.2.zip解压后, 可以看到如下两个文件 将这两个文件

    2024年02月09日
    浏览(56)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包