Saga Pattern Using Azure Service Bus
Saga Pattern
Using Azure Service Bus
Prerequisites
1. Install
the Azure.Messaging.ServiceBus NuGet package:
dotnet
add package Azure.Messaging.ServiceBus
2. Set
up an Azure Service Bus namespace and create the necessary topics and
subscriptions:
• Topics:
OrderCreated, PaymentProcessed
• Subscriptions
for each service: PaymentService, InventoryService, etc.
1.
Order Service (Publisher)
The
Order Service creates an order and publishes a message to the OrderCreated
topic.
using Azure.Messaging.ServiceBus;
using System;
using System.Text.Json;
using System.Threading.Tasks;
public class OrderService
{
private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";
private const string TopicName = "OrderCreated";
public async Task CreateOrderAsync(Order order)
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var sender = client.CreateSender(TopicName);
string orderMessage = JsonSerializer.Serialize(order);
var message = new ServiceBusMessage(orderMessage);
Console.WriteLine($"Order
Created: {order.OrderId}");
await sender.SendMessageAsync(message);
Console.WriteLine("Message
sent to Service Bus.");
}
}
public class Order
{
public string OrderId
{ get; set;
}
public string UserId
{ get; set;
}
public string ItemId
{ get; set;
}
public decimal Price
{ get; set;
}
}
2.
Payment Service
(Subscriber & Publisher)
The Payment Service listens to the OrderCreated topic and publishes to the PaymentProcessed topic.
using Azure.Messaging.ServiceBus;
using System;
using System.Text.Json;
using System.Threading.Tasks;
public class PaymentService
{
private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";
private const string OrderCreatedSubscription = "OrderCreated";
private const string PaymentProcessedTopic = "PaymentProcessed";
public async Task StartAsync()
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var processor = client.CreateProcessor(OrderCreatedSubscription);
processor.ProcessMessageAsync += async args =>
{
string body = args.Message.Body.ToString();
var order = JsonSerializer.Deserialize<Order>(body);
Console.WriteLine($"Processing
Payment for Order {order.OrderId}");
// Simulate payment success or failure
var paymentSuccess = new Random().Next(0,
2) == 1;
string paymentStatus = paymentSuccess ? "Success" : "Failed";
Console.WriteLine($"Payment
{paymentStatus} for Order {order.OrderId}");
// Publish payment result
await PublishPaymentResultAsync(order,
paymentSuccess);
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error:
{args.Exception.Message}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
}
private async Task PublishPaymentResultAsync(Order order,
bool success)
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var sender = client.CreateSender(PaymentProcessedTopic);
var paymentResult = new
{
OrderId = order.OrderId,
Success = success
};
string message = JsonSerializer.Serialize(paymentResult);
await sender.SendMessageAsync(new ServiceBusMessage(message));
Console.WriteLine("Payment
result published.");
}
}
3.
Inventory Service
(Subscriber)
The Inventory Service listens to the PaymentProcessed topic and takes appropriate action based on the payment result.
using Azure.Messaging.ServiceBus;
using System;
using System.Text.Json;
using System.Threading.Tasks;
public class InventoryService
{
private const string ServiceBusConnectionString = "<Your-Service-Bus-Connection-String>";
private const string PaymentProcessedSubscription = "PaymentProcessed";
public async Task StartAsync()
{
await using var client = new ServiceBusClient(ServiceBusConnectionString);
var processor = client.CreateProcessor(PaymentProcessedSubscription);
processor.ProcessMessageAsync += async args =>
{
string body = args.Message.Body.ToString();
var paymentResult = JsonSerializer.Deserialize<PaymentResult>(body);
if
(paymentResult.Success)
{
Console.WriteLine($"Reserving
Inventory for Order {paymentResult.OrderId}");
// Simulate inventory
reservation
Console.WriteLine($"Inventory
Reserved for Order {paymentResult.OrderId}");
}
else
{
Console.WriteLine($"Payment
Failed. No Inventory Reserved for Order {paymentResult.OrderId}");
}
await args.CompleteMessageAsync(args.Message);
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error:
{args.Exception.Message}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
}
}
public class PaymentResult
{
public string OrderId
{ get; set;
}
public bool Success
{ get; set;
}
}
4.
Main Program
Run the services in parallel.
using System;
using System.Threading.Tasks;
class Program
{
static async Task Main(string[]
args)
{
var orderService = new OrderService();
var paymentService = new PaymentService();
var inventoryService = new InventoryService();
// Start subscribers
Task.Run(()
=> paymentService.StartAsync());
Task.Run(()
=> inventoryService.StartAsync());
// Create an order
var order = new Order
{
OrderId = Guid.NewGuid().ToString(),
UserId = "123",
ItemId = "item_456",
Price = 100.00m
};
await orderService.CreateOrderAsync(order);
Console.WriteLine("Saga
process started. Press any key to exit...");
Console.ReadKey();
}
}
Comments
Post a Comment