MassTransit类库Saga文档翻译

这篇具有很好参考价值的文章主要介绍了MassTransit类库Saga文档翻译。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

翻译自 Saga State Machines

Saga State Machines(状态机)

Saga State Machines(状态机)以前被称为Automatonymous,从v8开始被合并到masstrtransit代码库中。

介绍

Automatonymous是.Net的State Machines(状态机)类库,它提供了一种C#语法来定义State Machines,包括状态、事件和行为。MassTransit包括Automatonymous,并添加了实例存储、事件关联、消息绑定、请求和响应支持以及调度。

Automatonymous不再是一个独立的NuGet包,它已经被MassTransit包含了。在以前的版本中,需要额外的包引用。所以之前如果引用了Automatonymous,则必须删除该引用,因为它不再兼容。

State Machine(状态机)

State Machine(状态机)定义状态、事件和行为。实现一个派生自MassTransitStateMachine<T>的状态机类,该类只创建一次,然后用于将事件触发的行为应用于状态机实例。

public class OrderStateMachine:MassTransitStateMachine<OrderState>
{}

Instance(实例)

Instance包含状态机实例的数据。当没有找到具有相同CorrelationId的现有实例时,将为每个已消费的初始事件创建一个新实例。一个Saga Repository用于持久化实例。实例是类,并且必须实现SagaStateMachineInstance接口。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);
    }
}

Instance实例必须存储当前状态(CurrentState),它可以是以下三种类型之一:

类型 描述
State 接口状态SagaStateMachineInstance。可能难以序列化,通常仅用于内存实例,但如果repository存储引擎支持将用户类型映射到存储类型,则可以使用。
string State的名称。但是,它占用了大量空间,因为每个实例都重复状态名。
int 小,快,但要求指定每个可能的状态,以便为每个状态分配int值。

如果CurrentState实例状态属性是state,则自动配置它。对于string或int类型,必须使用InstanceState方法。

要指定int状态值,请配置instance实例状态,如下所示。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public int CurrentState { get; set; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState, Submitted, Accepted);
    }
}

结果如下值:

0 - None, 1 - Initial, 2 - Final, 3 - Submitted, 4 - Accepted

State(状态)

States(状态)表示事件(events)消费后实例的当前状态。一个实例在给定时间只能处于一种状态。新实例默认为初始(Initial)状态,这是自动定义的。还为所有状态机定义了最终(Final)状态,并用于表示实例已达到最终状态。

在这个例子中,声明了两个状态(State)。状态由MassTransitStateMachine基类构造函数自动初始化。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
}

Event(事件)

事件(Event)是可能导致状态(State)变化的发生的事情。事件(Event)可以添加或更新实例数据,也可以更改实例(instance)的当前状态。Event是泛型的,其中T必须是有效的消息类型。

在下面的示例中,SubmitOrder消息被声明为一个事件,包括如何将该事件与实例关联。

除非事件实现了 CorrelatedBy,否则它们必须用关联表达式声明。

public interface SubmitOrder
{
    Guid OrderId { get; }    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }
    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

Behavior(行为)

行为是指在状态(state)中发生事件(event)时所发生的情况。

下面,Initial块用于定义在Initial状态期间SubmitOrder事件的行为。当使用SubmitOrder消息并且没有找到具有与OrderId匹配的CorrelationId的实例时,将在Initial状态下创建一个新实例。TransitionTo activity 将实例转换到Submitted状态,之后使用saga repository持久化实例。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }
}

随后,OrderAccepted事件可以通过下面所示的行为来处理。

public interface OrderAccepted
{
    Guid OrderId { get; }    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));
        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
    }
    public Event<OrderAccepted> OrderAccepted { get; private set; }
}
Message Order(消息顺序)

Message brokers(MQ)通常不保证消息顺序。因此,在状态机(state machine)设计中考虑无序消息是很重要的。

在上面的示例中,在OrderAccepted事件之后接收SubmitOrder消息可能会导致SubmitOrder消息在_error队列中结束。如果OrderAccepted事件首先被接收,它将被丢弃,因为它在初始(Initial)状态下不被接受。下面是处理这两种场景的更新状态机。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));
        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));
        During(Accepted,
            Ignore(SubmitOrder));
    }
}

在更新后的示例中,在接受(Accepted)状态下接收SubmitOrder消息会忽略该事件。然而,事件中的数据可能是有用的。在这种情况下,可以添加将数据复制到实例的行为。下面,在两个场景中捕获事件的数据。

public interface SubmitOrder
{
    Guid OrderId { get; }

    DateTime OrderDate { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public DateTime? OrderDate { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Accepted,
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate));
    }
}

Configuration(配置)

配置saga state machine(状态机)

services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();
});

上面的示例使用内存中的saga repository,但是可以使用任何saga repository。持久性部分包括受支持的saga repository的详细信息。

要测试state machine(状态机),请参阅测试部分。

Event(事件)

如上所示,事件(event)是状态机(state machine)可以使用的消息。事件(event)可以指定任何有效的消息类型,并且可以配置每个事件。有几种事件配置方法可用。

内置的CorrelatedBy<Guid>接口可以在消息约定中使用,以指定事件CorrelationId

public interface OrderCanceled :
    CorrelatedBy<Guid>
{    
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCanceled); // not required, as it is the default convention
    }
}

虽然上面显式声明了事件(event),但这不是必需的。默认将会自动的配置为CorrelatedBy<Guid>接口的事件(event)。

虽然方便,但有些人认为接口是对消息契约基础设施的入侵。MassTransit还支持一种声明性方法来为事件指定CorrelationId。通过配置全局消息拓扑,可以指定要用于关联的消息属性。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    // this is shown here, but can be anywhere in the application as long as it executes
    // before the state machine instance is created. Startup, etc. is a good place for it.
    // It only needs to be called once per process.
    static OrderStateMachine()
    {
        GlobalTopology.Send.UseCorrelationId<SubmitOrder>(x => x.OrderId);
    }

    public OrderStateMachine()
    {
        Event(() => SubmitOrder);
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

另一种方法是声明事件相关性,如下所示。当上述两种方法都未使用时,应使用此方法。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }
    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

因为OrderId是一个Guid,所以它可以用于事件关联。当在初始状态下接受SubmitOrder时,由于OrderId是Guid,因此新实例上的CorrelationId会自动分配OrderId值。
还可以使用查询表达式关联事件,当事件没有与实例的CorrelationId属性关联时,需要使用查询表达式。查询的开销更大,并且可能匹配多个实例,在设计状态机和事件时应该考虑到这一点。

只要可能,尝试使用CorrelationId进行关联。如果需要查询,则可能需要在属性上创建索引,以便优化数据库查询。

要使用另一种类型关联事件,需要额外的配置。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => e
            .CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
            .SelectId(x => NewId.NextGuid()));
    }
    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

还可以使用两个参数编写查询,这两个参数直接传递给repository(并且必须得到后台数据库的支持)。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => e
            .CorrelateBy((instance,context) => instance.OrderNumber == context.Message.OrderNumber)
            .SelectId(x => NewId.NextGuid()));
    }
    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

当事件没有与实例唯一相关的Guid时,必须配置.selectid表达式。在上面的示例中,NewId用于生成一个顺序标识符,该标识符将分配给实例CorrelationId。事件上的任何属性都可以用来初始化CorrelationId。

Ignore Event(忽略事件)

可能有必要忽略给定状态下的事件,以避免错误生成,或者防止消息被移动到_skip队列。要忽略某个状态中的事件,请使用ignore方法。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Accepted,
            Ignore(SubmitOrder));
    }
}

Composite Event(组合事件)

通过指定一个或多个必须使用的事件来配置组合事件,之后将引发组合事件。组合事件使用实例属性来跟踪所需的事件,这是在配置期间指定的。

要定义组合事件,必须首先配置所需的事件以及任何事件行为,然后才能配置组合事件。

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public int ReadyEventStatus { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        CompositeEvent(() => OrderReady, x => x.ReadyEventStatus, SubmitOrder, OrderAccepted);

        DuringAny(
            When(OrderReady)
                .Then(context => Console.WriteLine("Order Ready: {0}", context.Saga.CorrelationId)));
    }

    public Event OrderReady { get; private set; }
}

一旦使用了SubmitOrderOrderAccepted事件,就会触发OrderReady事件。

Missing Instance

如果事件与实例不匹配,则可以配置缺失的实例行为

public interface RequestOrderCancellation
{    
    Guid OrderId { get; }
}

public interface OrderNotFound
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCancellationRequested, e =>
        {
            e.CorrelateById(context => context.Message.OrderId);

            e.OnMissingInstance(m =>
            {
                return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
            });
        });
    }

    public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

在本例中,当在没有匹配实例的情况下使用取消订单请求时,将发送未找到订单的响应。响应更显式,而不是生成Fault。其他缺少的实例选项包括DiscardFaultExecute (ExecuteAsync的同步版本)。

Initial Insert(初始化插入)

为了提高新实例的性能,将事件配置为直接插入到saga repository中可以减少锁争用。要配置要插入的事件,它应该位于initial块中,并指定一个saga工厂。

public interface SubmitOrder
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, e => 
        {
            e.CorrelateById(context => context.Message.OrderId));

            e.InsertOnInitial = true;
            e.SetSagaFactory(context => new OrderState
            {
                CorrelationId = context.Message.OrderId
            })
        });

        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

在使用InsertOnInitial时,至关重要的是,saga repository能够检测重复的键(在本例中,是使用OrderId初始化的CorrelationId)。在这种情况下,在CorrelationId上使用集群主键可以防止插入重复的实例。如果使用不同的属性关联事件,请确保数据库对实例属性实施唯一约束,并且saga工厂使用事件属性值初始化实例属性。

public interface ExternalOrderSubmitted
{    
    string OrderNumber { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => ExternalOrderSubmitted, e => 
        {
            e.CorrelateBy(i => i.OrderNumber, x => x.Message.OrderNumber)
            e.SelectId(x => NewId.NextGuid());

            e.InsertOnInitial = true;
            e.SetSagaFactory(context => new OrderState
            {
                CorrelationId = context.CorrelationId ?? NewId.NextGuid(),
                OrderNumber = context.Message.OrderNumber,
            })
        });

        Initially(
            When(SubmitOrder)
                .TransitionTo(Submitted));
    }

    public Event<ExternalOrderSubmitted> ExternalOrderSubmitted { get; private set; }
}

数据库将对OrderNumber使用唯一约束来防止重复,saga repository将将其检测为现有实例,然后加载该实例以使用事件。

Completed Instance

默认情况下,实例不会从saga repository中删除。若要配置已完成的实例删除,请指定用于确定实例是否已完成的方法。

public interface OrderCompleted
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        DuringAny(
            When(OrderCompleted)
                .Finalize());

        ();
    }

    public Event<OrderCompleted> OrderCompleted { get; private set; }
}

当实例使用OrderCompleted事件时,实例将被完成(它将实例转换为Final状态)。SetCompletedWhenFinalized方法将一个处于Final状态的实例定义为已完成——然后由saga repository使用它来删除该实例。

要使用不同的完成表达式,例如检查实例是否处于完成状态的表达式,请使用SetCompleted方法,如下所示。

public interface OrderCompleted
{    
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCompleted, x => x.CorrelateById(context => context.Message.OrderId));

        DuringAny(
            When(OrderCompleted)
                .TransitionTo(Completed));

        SetCompleted(async instance => 
        {
            State<TInstance> currentState = await this.GetState(instance);

            return Completed.Equals(currentState);
        });
    }

    public State Completed { get; private set; }
    public Event<OrderCompleted> OrderCompleted { get; private set; }
}

Activities

状态机行为被定义为响应事件而执行的一系列活动。除了automautonomous中包含的活动之外,MassTransit还包括用于发送、发布和调度消息以及发起和响应请求的活动。

Publish

要发布事件,请添加publish活动。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderSubmittedEvent :
    OrderSubmitted
{
    public OrderSubmittedEvent(Guid orderId)
    {
        OrderId = orderId;
    }

    public Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
                .TransitionTo(Submitted));
    }
}

或者,可以使用消息初始化器来去除Event类。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .PublishAsync(context => context.Init<OrderSubmitted>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Submitted));
    }
}

Send

要发送消息,请添加send活动。

public interface UpdateAccountHistory
{
    Guid OrderId { get; }    
}

public class UpdateAccountHistoryCommand :
    UpdateAccountHistory
{
    public UpdateAccountHistoryCommand(Guid orderId)
    {
        OrderId = orderId;
    }

    public Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Initially(
            When(SubmitOrder)
                .Send(settings.AccountServiceAddress, context => new UpdateAccountHistoryCommand(context.Saga.CorrelationId))
                .TransitionTo(Submitted));
    }
}

或者,可以使用消息初始化器来去除Command类。

public interface UpdateAccountHistory
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Initially(
            When(SubmitOrder)
                .SendAsync(settings.AccountServiceAddress, context => context.Init<UpdateAccountHistory>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Submitted));
    }
}

Respond

状态机可以通过将请求消息类型配置为事件,并使用response方法来响应请求。在配置请求事件时,建议配置缺失的实例方法,以提供更好的响应体验(通过不同的响应类型,或者通过指示未找到实例的响应)。

public interface RequestOrderCancellation
{    
    Guid OrderId { get; }
}

public interface OrderCanceled
{
    Guid OrderId { get; }
}

public interface OrderNotFound
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => OrderCancellationRequested, e =>
        {
            e.CorrelateById(context => context.Message.OrderId);

            e.OnMissingInstance(m =>
            {
                return m.ExecuteAsync(x => x.RespondAsync<OrderNotFound>(new { x.OrderId }));
            });
        });

        DuringAny(
            When(OrderCancellationRequested)
                .RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Canceled));
    }

    public State Canceled { get; private set; }
    public Event<RequestOrderCancellation> OrderCancellationRequested { get; private set; }
}

有些场景需要等待状态机的响应。在这些场景中,应该存储响应原始请求所需的信息。

public record CreateOrder(Guid CorrelationId) : CorrelatedBy<Guid>;

public record ProcessOrder(Guid OrderId, Guid ProcessingId);

public record OrderProcessed(Guid OrderId, Guid ProcessingId);

public record OrderCancelled(Guid OrderId, string Reason);

public class ProcessOrderConsumer : IConsumer<ProcessOrder>
{
    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        await context.RespondAsync(new OrderProcessed(context.Message.OrderId, context.Message.ProcessingId));
    }
}

public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public Guid? ProcessingId { get; set; }
    public Guid? RequestId { get; set; }
    public Uri ResponseAddress { get; set; }
    public Guid OrderId { get; set; }
}

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Created { get; set; }
    
    public State Cancelled { get; set; }
    
    public Event<CreateOrder> OrderSubmitted { get; set; }
    
    public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; set; }
    
    public OrderStateMachine()
    {
        InstanceState(m => m.CurrentState);
        Event(() => OrderSubmitted);
        Request(() => ProcessOrder, order => order.ProcessingId, config => { config.Timeout = TimeSpan.Zero; });

        Initially(
            When(OrderSubmitted)
                .Then(context =>
                {
                    context.Saga.CorrelationId = context.Message.CorrelationId;
                    context.Saga.ProcessingId = Guid.NewGuid();

                    context.Saga.OrderId = Guid.NewGuid();

                    context.Saga.RequestId = context.RequestId;
                    context.Saga.ResponseAddress = context.ResponseAddress;
                })
                .Request(ProcessOrder, context => new ProcessOrder(context.Saga.OrderId, context.Saga.ProcessingId!.Value))
                .TransitionTo(ProcessOrder.Pending));
        
        During(ProcessOrder.Pending,
            When(ProcessOrder.Completed)
                .TransitionTo(Created)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(context.Saga, r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.Faulted)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Faulted"), r => r.RequestId = context.Saga.RequestId);
                }),
            When(ProcessOrder.TimeoutExpired)
                .TransitionTo(Cancelled)
                .ThenAsync(async context =>
                {
                    var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                    await endpoint.Send(new OrderCancelled(context.Saga.OrderId, "Time-out"), r => r.RequestId = context.Saga.RequestId);
                }));
    }
}

Schedule

状态机可以调度事件,它使用消息调度器来调度要传递给实例的消息。首先,必须声明Schedule。

public interface OrderCompletionTimeoutExpired
{
    Guid OrderId { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public Guid? OrderCompletionTimeoutTokenId { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Schedule(() => OrderCompletionTimeout, instance => instance.OrderCompletionTimeoutTokenId, s =>
        {
            s.Delay = TimeSpan.FromDays(30);

            s.Received = r => r.CorrelateById(context => context.Message.OrderId);
        });
    }

    public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}

配置指定了可以被调度活动覆盖的Delay,以及Received事件的相关表达式。状态机可以使用Received事件,如下所示。

OrderCompletionTimeoutTokenId是一个Guid?用于跟踪计划消息tokenId的实例属性,稍后可使用该属性取消对事件的计划。

public interface OrderCompleted
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Accepted,
            When(OrderCompletionTimeout.Received)
                .PublishAsync(context => context.Init<OrderCompleted>(new { OrderId = context.Saga.CorrelationId }))
                .Finalize());
    }

    public Schedule<OrderState, OrderCompletionTimeoutExpired> OrderCompletionTimeout { get; private set; }
}

可以使用Schedule活动安排事件。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }))
                .TransitionTo(Accepted));
    }
}

如上所述,可以通过Schedule活动覆盖延迟。实例和消息(context.Data)内容都可以用来计算延迟。

public interface OrderAccepted
{
    Guid OrderId { get; }    
    TimeSpan CompletionTime { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Schedule(OrderCompletionTimeout, context => context.Init<OrderCompletionTimeoutExpired>(new { OrderId = context.Saga.CorrelationId }),
                    context => context.Message.CompletionTime)
                .TransitionTo(Accepted));
    }
}

一旦收到预定的事件,就会清除OrderCompletionTimeoutTokenId属性。

如果不再需要计划的事件,则可以使用Unschedule活动。

public interface OrderAccepted
{
    Guid OrderId { get; }    
    TimeSpan CompletionTime { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        DuringAny(
            When(OrderCancellationRequested)
                .RespondAsync(context => context.Init<OrderCanceled>(new { OrderId = context.Saga.CorrelationId }))
                .Unschedule(OrderCompletionTimeout)
                .TransitionTo(Canceled));
    }
}

Request

状态机可以使用request方法发送请求,该方法指定了请求类型和响应类型。可以指定其他请求设置,包括ServiceAddress和Timeout。

如果指定了ServiceAddress,它应该是将响应请求的服务的端点地址。如果没有指定,请求将被发布。

默认超时时间为30秒,但任何大于或等于TimeSpan.Zero的值都可以。当发送的请求超时大于零时,将调度TimeoutExpired消息。指定TimeSpan.Zero 不会调度超时消息,并且请求永远不会超时。

在定义请求时,应该指定一个实例属性来存储用于将响应与状态机实例相关联的RequestId。当请求挂起时,RequestId存储在属性中。当请求完成后,该属性被清除。如果请求超时或出现错误,则保留requesttid,以便在请求最终完成后进行关联(例如将请求从_error队列移回服务队列)。

最近的增强使此属性成为可选属性,而不是使用实例的CorrelationId作为请求消息RequestId。这可以简化响应相关性,并且还避免了在saga repository上添加索引的需要。但是,在高度复杂的系统中,为请求重用CorrelationId可能会导致问题。所以在选择使用哪种方法时要考虑到这一点。

Configuration

要声明请求,请添加request属性并使用request方法对其进行配置。

public interface ProcessOrder
{
    Guid OrderId { get; }    
}

public interface OrderProcessed
{
    Guid OrderId { get; }
    Guid ProcessingId { get; }
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public Guid? ProcessOrderRequestId { get; set; }
    public Guid? ProcessingId { get; set; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine(OrderStateMachineSettings settings)
    {
        Request(
            () => ProcessOrder,
            x => x.ProcessOrderRequestId, // Optional
            r => {
                r.ServiceAddress = settings.ProcessOrderServiceAddress;
                r.Timeout = settings.RequestTimeout;
            });
    }

    public Request<OrderState, ProcessOrder, OrderProcessed> ProcessOrder { get; private set; }
}

一旦定义, request活动就可以添加到行为中。

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        During(Submitted,
            When(OrderAccepted)
                .Request(ProcessOrder, x => x.Init<ProcessOrder>(new { OrderId = x.Saga.CorrelationId}))
                .TransitionTo(ProcessOrder.Pending));

        During(ProcessOrder.Pending,
            When(ProcessOrder.Completed)
                .Then(context => context.Saga.ProcessingId = context.Message.ProcessingId)
                .TransitionTo(Processed),
            When(ProcessOrder.Faulted)
                .TransitionTo(ProcessFaulted),
            When(ProcessOrder.TimeoutExpired)
                .TransitionTo(ProcessTimeoutExpired));
    }

    public State Processed { get; private set; }
    public State ProcessFaulted { get; private set; }
    public State ProcessTimeoutExpired { get; private set; }
}

Request包括三个事件:Completed、Faulted和TimeoutExpired。这些事件可以在任何状态中使用,但是,请求包含一个Pending状态,可以使用它来避免声明单独的Pending状态。

Missing Instance

如果在收到响应、错误或超时之前完成了saga实例,则可能会配置一个缺失的实例处理程序,类似于常规事件。

Request(() => ProcessOrder, x => x.ProcessOrderRequestId, r =>
{
    r.Completed = m => m.OnMissingInstance(i => i.Discard());
    r.Faulted = m => m.OnMissingInstance(i => i.Discard());
    r.TimeoutExpired = m => m.OnMissingInstance(i => i.Discard());
});

Custom

在某些情况下,事件行为可能具有需要在作用域级别管理的依赖关系,例如数据库连接,或者复杂性最好封装在单独的类中,而不是作为状态机本身的一部分。开发人员可以创建自己的活动以供状态机使用,也可以选择创建自己的扩展方法以将其添加到行为中。

要创建一个activity,需要创建一个类来实现IStateMachineActivity<TInstance, TData> 如图所示。

public class PublishOrderSubmittedActivity :
    IStateMachineActivity<OrderState, SubmitOrder>
{
    readonly ISomeService _service;

    public PublishOrderSubmittedActivity(ISomeService service)
    {
        _service = service;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("publish-order-submitted");
    }

    public void Accept(StateMachineVisitor visitor)
    {
        visitor.Visit(this);
    }

    public async Task Execute(BehaviorContext<OrderState, SubmitOrder> context, IBehavior<OrderState, SubmitOrder> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);
        
        // always call the next activity in the behavior
        await next.Execute(context).ConfigureAwait(false);
    }

    public Task Faulted<TException>(BehaviorExceptionContext<OrderState, SubmitOrder, TException> context, 
        IBehavior<OrderState, SubmitOrder> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }
}

对于ISomeService,在使用IPublishEndpoint发布事件的类中实现接口,如下所示。

public class SomeService :
    ISomeService
{
    IPublishEndpoint _publishEndpoint;
    
    public SomeService(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }
    
    public async Task OnOrderSubmitted(Guid orderId)
    {
        await _publishEndpoint.Publish<OrderSubmitted>(new { OrderId = orderId });
    }
}

创建后,在状态机中配置活动,如图所示。

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Activity(x => x.OfType<PublishOrderSubmittedActivity>())
                .TransitionTo(Submitted));
    }
}

当使用SubmitOrder事件时,状态机将从容器中解析活动,并调用Execute方法。活动将被限定范围,因此任何依赖都将在消息ConsumeContext中解析。

在上面的例子中,事件类型是事先已知的。如果需要任何事件类型的活动,则可以在不指定事件类型的情况下创建该活动。

public class PublishOrderSubmittedActivity :
    IStateMachineActivity<OrderState>
{
    readonly ISomeService _service;

    public PublishOrderSubmittedActivity(ISomeService service)
    {
        _service = service;
    }

    public void Probe(ProbeContext context)
    {
        context.CreateScope("publish-order-submitted");
    }

    public void Accept(StateMachineVisitor visitor)
    {
        visitor.Visit(this);
    }

    public async Task Execute(BehaviorContext<OrderState> context, IBehavior<OrderState> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);

        await next.Execute(context).ConfigureAwait(false);
    }

    public async Task Execute<T>(BehaviorContext<OrderState, T> context, IBehavior<OrderState, T> next)
    {
        await _service.OnOrderSubmitted(context.Saga.CorrelationId);

        await next.Execute(context).ConfigureAwait(false);
    }

    public Task Faulted<TException>(BehaviorExceptionContext<OrderState, TException> context, IBehavior<OrderState> next) 
        where TException : Exception
    {
        return next.Faulted(context);
    }

    public Task Faulted<T, TException>(BehaviorExceptionContext<OrderState, T, TException> context, IBehavior<OrderState, T> next)
        where TException : Exception
    {
        return next.Faulted(context);
    }
}

要注册实例活动,请使用以下语法。文章来源地址https://www.toymoban.com/news/detail-462278.html

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Initially(
            When(SubmitOrder)
                .Activity(x => x.OfInstanceType<PublishOrderSubmittedActivity>())
                .TransitionTo(Submitted));
    }
}

到了这里,关于MassTransit类库Saga文档翻译的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Opentsdb官方优化文档 - 翻译

    Tuning — OpenTSDB 2.4 documentation As with any database there are many tuning parameters for OpenTSDB that can be used to improve write and read performance. Some of these options are specific to certain backends, others are global. 翻译: 与任何数据库一样,OpenTSDB有许多调优参数可用于提高写和读性能。其中一些选项是特定于

    2024年01月16日
    浏览(89)
  • 翻译docker官方文档(残缺版)

    The order of Dockerfile instructions matters. A Docker build consists of a series of ordered build instructions. Each instruction in a Dockerfile roughly translates to an image layer. The following diagram illustrates how a Dockerfile translates into a stack of layers in a container image. Dockerfile指令的顺序很重要。Docker构建由一系列有序的构

    2024年02月07日
    浏览(32)
  • 本地HTML文档批量翻译软件

    免费文档翻译软件将我们的本地文档进行批量翻译,支持多语言的翻译和批量导出。对于我们保存的excel、word、txt或是html文档都可以批量翻译,并自动发布到网站或者自媒体。免费文档翻译软件还支持对我们的英语网站进行内容采集,自动翻译保存到本地。   网站自媒体通

    2024年02月12日
    浏览(25)
  • slint1.32 官方文档翻译00

    来源于 Slint 1.3.2 Reference 主要用 有道翻译,个人参考用。翻译不妥的,请指正。 目录: Slint 1.3.2 Reference Slint 1.3.2参考 INTRODUCTION 介绍 Getting Started 开始 Supported Platforms 支持的平台 LANGUAGE REFERENCE 语言参考 Introduction 介绍 Concepts 概念 .slint File 文件 Positioning and Layout of Elements 元

    2024年01月16日
    浏览(37)
  • 一款基于AIGC的文档翻译网站

    一款文字/文件翻译的网站,支持多个领域的翻译,支持常见的语言翻译(韩/日/法/英/俄/德…),最大百分比的保持原文排版(及个别除外基本100%还原)。 新用户注册就有100页的免费额度,每月系统还会随机赠送翻译额度,说实话这比好多的企业要好的多了,低至8毛钱一页,而且最贵

    2024年02月05日
    浏览(31)
  • slint 1.3.2 官方文档翻译06

    SlintPad 基于官方文档的个人翻译,主要使用 有道翻译。 Debugging Techniques - Slint 1.3.2 Reference ADVANCED TOPICS 高级的主题-- Debugging Techniques 调试技术   On this page we share different techniques and tools we’ve built into Slint that help you track down different issues you may be running into, during the design and dev

    2024年01月18日
    浏览(47)
  • ChatGPT将批量文档翻译成中文的方法

    文档翻译成中文软件是指在处理文档时,自动将文档中的内容翻译成中文的软件。这些软件通常采用自然语言处理技术,通过对待翻译文本的分词、词义分析、语法分析等多种技术处理,实现对文本中的单词、短语、句子等级别的翻译。 文档翻译成中文软件有传统的谷歌翻译

    2023年04月14日
    浏览(24)
  • OpenAI文档翻译——核心概念(提示词、令牌、模型)

    OpenAI API几乎可以应用于任何涉及理解或生成自然语言、代码或图像的任务。它提供了一系列适合不同任务的不同算力的模型来适应各种任务,这个能力也支持你去调节自己定义的模型。而不同类型的模型可以用于从内容生成到语义搜索和分类的所有事情。 关键概念 在了解关

    2023年04月16日
    浏览(24)
  • OpenLayers7官方文档翻译,OpenLayers7中文文档,OpenLayers快速入门

    这个入门文档向您展示如何放一张地图在web网页上。 开发设置使用 NodeJS (至少需要Nodejs 14 或更高版本),并要求安装 git 。 开始使用OpenLayers构建项目的最简单方法是运行: npm create ol-app 第一个命令将创建一个名为 my-app 的目录(如果您愿意,可以使用不同的名称),安装

    2024年02月10日
    浏览(39)
  • c编译器学习02:chibicc文档翻译

    先粗略地看一遍作者的书籍。 https://www.sigbus.info/compilerbook# “低レイヤを知りたい人のためのCコンパイラ作成入門” 为想了解底层的人准备的C编译器制作入门 Rui Ueyama ruiu@cs.stanford.edu 2020-03-16 https://www.sigbus.info/ 植山瑠偉 谷歌软件工程师 我的专业知识涵盖从 HTML/JavaScript 到硬

    2024年02月21日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包