Published on

saga implementation using masstransit

Authors

in microservices development, we often deal with complex code for handling transactions between services, typically involving verbose try-catch blocks. to ensure consistency in distributed systems, the saga pattern offers a straightforward approach. today, i'll walk you through implementing this pattern using masstransit, a reliable framework for orchestrating distributed systems.

there are two type of saga implementations in masstransit, in this blog we will cover saga state machines. you can also look at routing slip if you have to route a message through series of components.

here is the roadmap to implement the saga pattern using masstransit:

  • define events: you can write/draw the basic structure of events that you have to implement
  • crate state machine instance
  • define states for events
  • define behaviors between states
  • implement state machine
  • register handlers, saga state machine

to define a state machine we should implement the class MassTransitStateMachine<T> like this:

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
}

instance

an instance contains the data for state machine state.

when ever we switch from one state to another a new instance created with correlation id.

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
}

states

An instance can only be in one state at a given time. The _Final_ state is also defined for all state machines and is used to signify the instance has reached the final state.
public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State Accepted { get; private set; }
}

event

event is something that happened which may result in a state change. events can change the instance data & current state.

public interface SubmitOrder
{
    Guid OrderId { get; }
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));
    }

    public Event<SubmitOrder> SubmitOrder { get; private set; }
}

configuration

services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .InMemoryRepository();
});


lets say we have simple scenario for an e-commerce where a customer orders an item. we should implement the happy path but for any errors we should implement the rollback scenarios. For Example: if payment rejected event occurs we should not reserve the stock also send a order failed event.

events:

OrderSubmit,
StockReserved,
StockNotReserved,
PaymentConfirmed,
PaymentRejected

here is the basic overview of state's behaviors that we want to implement.

saga

here is an example behavior for handling the happy path as well as errors.


During(StockReserved,
  When(PaymentConfirmedEvent)
  .TransitionTo(PaymentConfirmed)
  .Send(OrderCompletedCommandQueue), context => new OrderCompletedCommand {
	OrderId = context.Instance.OrderId
  })
  .Finalize(),
  When(PaymentRejectedEvent)
  .TransitionTo(PaymentRejected)
  .Send(OrderFailedCommandQueue), context => new OrderFailedCommand {
	OrderId = context.Instance.OrderId,
	  Reason = context.Data.Reason
  })
  .Send(CompensateStockCommandQueue), context => new CompensateStockCommand {
	Items = context.Data.Items
  }));

as you can see, if we encounter an error in the PaymentConfirmed state, we will send rollback events: OrderFailedCommandQueue, and CompensateStockCommandQueue. but if nothing occurs, we are finalizing the transaction.

this way we have implemented the clean way of handling distributed transactions and handled resiliency if any error occurs. and also masstransit handles the persistency when dealing with sagas because without persistence state, a saga would consider every event as a new event. you can choose multiple db sources like mongodb, redis, sql etc.

here is an example of using sql for persistence

services.AddMassTransit(cfg =>
{
    cfg.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Pessimistic; // or use Optimistic, which requires RowVersion

            r.AddDbContext<DbContext, OrderStateDbContext>((provider,builder) =>
            {
                builder.UseSqlServer(connectionString, m =>
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
                });
            });
        });
});


here is an example for the complete implementation for a state machine

namespace Library.Components.StateMachines
{
    using Contracts;
    using MassTransit;


    public sealed class BookStateMachine :
        MassTransitStateMachine<Book>
    {
        static BookStateMachine()
        {
            MessageContracts.Initialize();
        }

        public BookStateMachine()
        {
            InstanceState(x => x.CurrentState, Available, Reserved);

            Event(() => ReservationRequested, x => x.CorrelateById(m => m.Message.BookId));

            Initially(
                When(Added)
                    .CopyDataToInstance()
                    .TransitionTo(Available));

            During(Available,
                When(ReservationRequested)
                    .Then(context => context.Saga.ReservationId = context.Message.ReservationId)
                    .PublishBookReserved()
                    .TransitionTo(Reserved)
            );

            During(Reserved,
                When(ReservationRequested)
                    .If(context => context.Saga.ReservationId.HasValue && context.Saga.ReservationId.Value == context.Message.ReservationId,
                        x => x.PublishBookReserved())
            );

            During(Reserved,
                When(BookReservationCanceled)
                    .TransitionTo(Available));

            During(Available, Reserved,
                When(BookCheckedOut)
                    .TransitionTo(CheckedOut)
            );
        }

        public Event<BookAdded> Added { get; }
        public Event<BookCheckedOut> BookCheckedOut { get; }
        public Event<BookReservationCanceled> BookReservationCanceled { get; }
        public Event<ReservationRequested> ReservationRequested { get; }

        public State Available { get; }
        public State Reserved { get; }
        public State CheckedOut { get; }
    }


    public static class BookStateMachineExtensions
    {
        public static EventActivityBinder<Book, BookAdded> CopyDataToInstance(this EventActivityBinder<Book, BookAdded> binder)
        {
            return binder.Then(x =>
            {
                x.Saga.DateAdded = x.Message.Timestamp.Date;
                x.Saga.Title = x.Message.Title;
                x.Saga.Isbn = x.Message.Isbn;
            });
        }

        public static EventActivityBinder<Book, ReservationRequested> PublishBookReserved(this EventActivityBinder<Book, ReservationRequested> binder)
        {
            return binder.PublishAsync(context => context.Init<BookReserved>(new
            {
                context.Message.ReservationId,
                context.Message.MemberId,
                context.Message.Duration,
                context.Message.BookId,
                InVar.Timestamp
            }));
        }
    }
}

TLDR

in this blog we have covered how to implement saga pattern using masstransit in dotnet. when dealing with distributed transactions we should consider solutions like saga pattern, routing slip etc in order to handle transactions with multiple processes, not single processes. because if we face with the heavy workload we can't rely on a single process. so we should find these kind of solutions to implement the distributed transactions in a meaningful way.

i chose masstransit but we can also use other frameworks like nservicebus, brighter, etc.


Resources