Saga Pattern Using Azure Service Bus

 

Saga Pattern

Using Azure Service Bus


The Saga Pattern is designed to manage distributed transactions in a microservices architecture. It helps ensure data consistency across multiple services by breaking a significant transaction into smaller, manageable steps.

A Saga is a sequence of steps (or transactions) that must be executed in a specific order across multiple services. If one step fails, compensation actions are triggered to undo the changes made by the previous steps.

Types of Saga Patterns

Choreography-Based Saga:

  • Each service publishes events to inform other services about the progress or failure of a transaction.
  • There’s no central orchestrator; services communicate through an event bus like Kafka or RabbitMQ.
  • Pros: Decentralized, scalable, and loosely coupled.
  • Cons: Complexity in managing dependencies and failure recovery.

Orchestration-Based Saga:

  • A central orchestrator (Saga Manager) coordinates the workflow of the Saga.
  • The orchestrator sends commands to services and listens for their responses.
  • Pros: Easier to manage and debug.
  • Cons: Introduces a single point of failure (the orchestrator).

 

Prerequisites

            1.         Install the Azure.Messaging.ServiceBus NuGet package:

dotnet add package Azure.Messaging.ServiceBus

            2.         Set up an Azure Service Bus namespace and create the necessary topics and subscriptions:

                       Topics: OrderCreated, PaymentProcessed

                       Subscriptions for each service: PaymentService, InventoryService, etc.

1. Order Service (Publisher)

The Order Service creates an order and publishes a message to the OrderCreated topic.

using Azure.Messaging.ServiceBus;

using System;

using System.Text.Json;

using System.Threading.Tasks;

public class OrderService

{

    private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";

    private const string TopicName = "OrderCreated";

    public async Task CreateOrderAsync(Order order)

    {

        await using var client = new ServiceBusClient(ServiceBusConnectionString);

        var sender = client.CreateSender(TopicName);

        string orderMessage = JsonSerializer.Serialize(order);

        var message = new ServiceBusMessage(orderMessage);

        Console.WriteLine($"Order Created: {order.OrderId}");

        await sender.SendMessageAsync(message);

        Console.WriteLine("Message sent to Service Bus.");

    }

}

public class Order

{

    public string OrderId { get; set; }

    public string UserId { get; set; }

    public string ItemId { get; set; }

    public decimal Price { get; set; }

}

 

2. Payment Service (Subscriber & Publisher)

The Payment Service listens to the OrderCreated topic and publishes to the PaymentProcessed topic.

 

using Azure.Messaging.ServiceBus;

using System;

using System.Text.Json;

using System.Threading.Tasks;

public class PaymentService

{

    private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";

    private const string OrderCreatedSubscription = "OrderCreated";

    private const string PaymentProcessedTopic = "PaymentProcessed";

    public async Task StartAsync()

    {

        await using var client = new ServiceBusClient(ServiceBusConnectionString);

        var processor = client.CreateProcessor(OrderCreatedSubscription);

        processor.ProcessMessageAsync += async args =>

        {

            string body = args.Message.Body.ToString();

            var order = JsonSerializer.Deserialize<Order>(body);

            Console.WriteLine($"Processing Payment for Order {order.OrderId}");

            // Simulate payment success or failure

            var paymentSuccess = new Random().Next(0, 2) == 1;

            string paymentStatus = paymentSuccess ? "Success" : "Failed";

            Console.WriteLine($"Payment {paymentStatus} for Order {order.OrderId}");

            // Publish payment result

            await PublishPaymentResultAsync(order, paymentSuccess);

            await args.CompleteMessageAsync(args.Message);

        };

        processor.ProcessErrorAsync += args =>

        {

            Console.WriteLine($"Error: {args.Exception.Message}");

            return Task.CompletedTask;

        };

        await processor.StartProcessingAsync();

    }

    private async Task PublishPaymentResultAsync(Order order, bool success)

    {

        await using var client = new ServiceBusClient(ServiceBusConnectionString);

        var sender = client.CreateSender(PaymentProcessedTopic);

        var paymentResult = new

        {

            OrderId = order.OrderId,

            Success = success

        };

        string message = JsonSerializer.Serialize(paymentResult);

        await sender.SendMessageAsync(new ServiceBusMessage(message));

        Console.WriteLine("Payment result published.");

    }

}

 

 

3. Inventory Service (Subscriber)

The Inventory Service listens to the PaymentProcessed topic and takes appropriate action based on the payment result.

 

 

using Azure.Messaging.ServiceBus;

using System;

using System.Text.Json;

using System.Threading.Tasks;

public class InventoryService

{

    private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";

    private const string PaymentProcessedSubscription = "PaymentProcessed";

    public async Task StartAsync()

    {

        await using var client = new ServiceBusClient(ServiceBusConnectionString);

        var processor = client.CreateProcessor(PaymentProcessedSubscription);

        processor.ProcessMessageAsync += async args =>

        {

            string body = args.Message.Body.ToString();

            var paymentResult = JsonSerializer.Deserialize<PaymentResult>(body);

            if (paymentResult.Success)

            {

                Console.WriteLine($"Reserving Inventory for Order {paymentResult.OrderId}");

                // Simulate inventory reservation

                Console.WriteLine($"Inventory Reserved for Order {paymentResult.OrderId}");

            }

            else

            {

                Console.WriteLine($"Payment Failed. No Inventory Reserved for Order {paymentResult.OrderId}");

            }

            await args.CompleteMessageAsync(args.Message);

        };

        processor.ProcessErrorAsync += args =>

        {

            Console.WriteLine($"Error: {args.Exception.Message}");

            return Task.CompletedTask;

        };

        await processor.StartProcessingAsync();

    }

}

public class PaymentResult

{

    public string OrderId { get; set; }

    public bool Success { get; set; }

}

4. Main Program

Run the services in parallel.

using System;

using System.Threading.Tasks;

class Program

{

    static async Task Main(string[] args)

    {

        var orderService = new OrderService();

        var paymentService = new PaymentService();

        var inventoryService = new InventoryService();

        // Start subscribers

        Task.Run(() => paymentService.StartAsync());

        Task.Run(() => inventoryService.StartAsync());

        // Create an order

        var order = new Order

        {

            OrderId = Guid.NewGuid().ToString(),

            UserId = "123",

            ItemId = "item_456",

            Price = 100.00m

        };

        await orderService.CreateOrderAsync(order);

        Console.WriteLine("Saga process started. Press any key to exit...");

        Console.ReadKey();

    }

}

 

Comments

Popular posts from this blog

Performance Optimization in Sitecore

Strategies for Migrating to Sitecore from legacy or upgrading from older Sitecore

Azure Event Grid Sample code