Создание бизнес-приложений на основе сообщений с помощью NServiceBus и Служебной шины Azure
NServiceBus — это коммерческая платформа обмена сообщениями, предоставляемая Particular Software. Созданное на основе Служебной шины Azure, это решение помогает разработчикам сосредоточиться на бизнес-логике, абстрагируя вопросы, связанные с инфраструктурой. В этом руководстве показано, как создать решение, которое выполняет обмен сообщениями между двумя службами. Вы также узнаете, как автоматически повторять отправку недоставленных сообщений и рассмотрите варианты размещения этих служб в Azure.
Примечание.
Код для этого руководства доступен на веб-сайте Particular Software Docs.
Необходимые компоненты
В примере предполагается, что вы создали пространство имен Служебной шины Azure.
Внимание
Для NServiceBus требуется по меньшей мере уровень "Стандартный". Уровень "Базовый" не подойдет.
Скачивание и подготовка решения
Скачайте код с веб-сайта Particular Software Docs. Решение
SendReceiveWithNservicebus.sln
состоит из трех проектов:- Sender — консольное приложение, которое отправляет сообщения.
- Receiver — консольное приложение, которое получает сообщения от отправителя и отвечает на них.
- Shared — библиотека классов, содержащая контракты сообщений, совместно используемые отправителем и получателем.
На следующей схеме, созданной ServiceInsight, средством визуализации и отладки от Particular Software, показан поток сообщений:
Откройте
SendReceiveWithNservicebus.sln
в избранном редакторе кода (например, Visual Studio 2022).Откройте
appsettings.json
в проектах Receiver и Sender и задайте дляAzureServiceBusConnectionString
строку подключения для пространства имен Служебной шины Azure.- Это можно найти в разделе портал Azure в разделе служебная шина "Параметры>пространства имен>" политик>общего доступа RootManageSharedAccessKey>Primary Connection String.
- Также
AzureServiceBusTransport
есть конструктор, принимающий пространство имен и учетные данные маркера, которые в рабочей среде будут более безопасными, однако в целях этого руководства будет использоваться общий ключ доступа строка подключения.
Определение общих контрактов сообщений
Библиотека классов Shared позволяет определить контракты, используемые для отправки сообщений. Она включает ссылку на пакет NuGet NServiceBus
с интерфейсами, которые можно использовать для определения сообщений. Интерфейсы не являются обязательными, но они предоставляют дополнительную проверку от NServiceBus и обеспечивают самодокументируемость кода.
Сначала рассмотрим класс Ping.cs
.
public class Ping : NServiceBus.ICommand
{
public int Round { get; set; }
}
Класс Ping
определяет сообщение, которое Sender отправляет Receiver. Это простой класс C#, реализующий интерфейс NServiceBus.ICommand
из пакета NServiceBus. Это сообщение информирует читателя и NServiceBus, что это команда, хотя существуют и другие способы определения сообщений без использования интерфейсов.
Другой класс сообщений в проектах Shared — Pong.cs
:
public class Pong : NServiceBus.IMessage
{
public string Acknowledgement { get; set; }
}
Pong
также является простым объектом C#, хотя реализует NServiceBus.IMessage
. Интерфейс IMessage
представляет общее сообщение, которое не является ни командой, ни событием, и часто используется для ответов. В нашем примере это ответ, который Receiver отправляет обратно Sender, чтобы указать, что сообщение было получено.
Вы будете использовать типы сообщения Ping
и Pong
. Следующим шагом будет настройка Sender для использования Служебной шины Azure и отправки сообщения Ping
.
Настройка отправителя
Sender — это конечная точка, которая отправляет сообщение Ping
. Вам нужно настроить Sender для использования Служебной шины Azure в качестве механизма транспортировки, а затем создать и отправить экземпляр Ping
.
В методе Main
файла Program.cs
нужно настроить конечную точку Sender:
var host = Host.CreateDefaultBuilder(args)
// Configure a host for the endpoint
.ConfigureLogging((context, logging) =>
{
logging.AddConfiguration(context.Configuration.GetSection("Logging"));
logging.AddConsole();
})
.UseConsoleLifetime()
.UseNServiceBus(context =>
{
// Configure the NServiceBus endpoint
var endpointConfiguration = new EndpointConfiguration("Sender");
var connectionString = context.Configuration.GetConnectionString("AzureServiceBusConnectionString");
// If token credentials are to be used, the overload constructor for AzureServiceBusTransport would be used here
var routing = endpointConfiguration.UseTransport(new AzureServiceBusTransport(connectionString));
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.AuditProcessedMessagesTo("audit");
routing.RouteToEndpoint(typeof(Ping), "Receiver");
endpointConfiguration.EnableInstallers();
return endpointConfiguration;
})
.ConfigureServices(services => services.AddHostedService<SenderWorker>())
.Build();
await host.RunAsync();
Этот код достаточно объемный, поэтому мы рассмотрим его по частям.
Настройка узла для конечной точки
Размещение и ведение журнала настраиваются с помощью стандартных параметров универсального узла Майкрософт. Сейчас конечная точка настроена для запуска в качестве консольного приложения, но в нее можно внести минимальные изменения для запуска в Функциях Azure, что будет обсуждаться далее в этой статье.
Настройка конечной точки NServiceBus
Далее нужно указать узлу использовать NServiceBus с помощью метода расширения .UseNServiceBus(…)
. Метод принимает функцию обратного вызова для получения конечной точки, которая будет запускаться при запуске узла.
В конфигурации конечной точки определите AzureServiceBus
для транспортировки, предоставив строку подключения из appsettings.json
. Затем настройте маршрутизацию так, чтобы сообщения типа Ping
отправлялись в конечную точку с именем Receiver. Это позволяет NServiceBus автоматизировать процесс отправки сообщения в назначение, не требуя адреса получателя.
При вызове функции EnableInstallers
будет настроена топология в пространстве имен Служебной шины Azure при запуске конечной точки. При необходимости будут созданы нужные очереди. В рабочих средах еще одним способом создания топологии является операционное написание скриптов.
Настройка фоновой службы для отправки сообщений
Оставшаяся часть отправителя — это SenderWorker
, фоновая служба, которая настроена для отправки сообщения Ping
каждую секунду.
public class SenderWorker : BackgroundService
{
private readonly IMessageSession messageSession;
private readonly ILogger<SenderWorker> logger;
public SenderWorker(IMessageSession messageSession, ILogger<SenderWorker> logger)
{
this.messageSession = messageSession;
this.logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var round = 0;
while (!stoppingToken.IsCancellationRequested)
{
await messageSession.Send(new Ping { Round = round++ });;
logger.LogInformation($"Message #{round}");
await Task.Delay(1_000, stoppingToken);
}
}
catch (OperationCanceledException)
{
// graceful shutdown
}
}
}
IMessageSession
в ExecuteAsync
вставляется в SenderWorker
и позволяет отправлять сообщения с помощью NServiceBus за пределами обработчика сообщений. Маршрутизация, настроенная в Sender
, определяет назначение сообщений Ping
. При этом топология системы (какие сообщения направляются на какие адреса) хранится отдельно от бизнес-кода.
Приложение Sender также содержит PongHandler
. Мы вернемся к нему после того, как рассмотрим Receiver, что мы сделаем дальше.
Настройка получателя
Receiver — это конечная точка, которая ожидает сообщения Ping
, регистрирует время получения сообщения и отправляет ответ обратно отправителю. В этом разделе мы быстро рассмотрим конфигурацию конечной точки, аналогичную Sender, а затем перейдем к обработчику сообщений.
Как и в случае с отправителем, настройте получателя в качестве консольного приложения с использованием универсального узла Майкрософт. Получатель использует ту же конфигурацию ведения журнала и конечных точек (служебная шина Azure используется для транспортировки сообщений), но с другим именем, чтобы отличать его от отправителя:
var endpointConfiguration = new EndpointConfiguration("Receiver");
Так как эта конечная точка отвечает только отправителю и не запускает новые беседы, настройка маршрутизации не требуется. Кроме того, в отличие от отправителя, фоновый рабочий процесс не требуется, так как получатель отвечает только при получении сообщения.
Обработчик сообщений Ping
Проект Receiver содержит обработчик сообщений с именем PingHandler
:
public class PingHandler : NServiceBus.IHandleMessages<Ping>
{
private readonly ILogger<PingHandler> logger;
public PingHandler(ILogger<PingHandler> logger)
{
this.logger = logger;
}
public async Task Handle(Ping message, IMessageHandlerContext context)
{
logger.LogInformation($"Processing Ping message #{message.Round}");
// throw new Exception("BOOM");
var reply = new Pong { Acknowledgement = $"Ping #{message.Round} processed at {DateTimeOffset.UtcNow:s}" };
await context.Reply(reply);
}
}
Не будем сейчас рассматривать закомментированный код. Мы вернемся к нему позже, когда будем обсуждать вопрос восстановления после сбоя.
Класс реализует IHandleMessages<Ping>
с определением одного метода: Handle
. Этот интерфейс сообщает NServiceBus, что, когда конечная точка получает сообщение типа Ping
, она должна обрабатываться методом Handle
в этом обработчике. Метод Handle
принимает само сообщение в качестве параметра и объект IMessageHandlerContext
, который позволяет выполнять дальнейшие операции обмена сообщениями, такие как ответ, отправка команд и публикация событий.
Мы PingHandler
просто: когда Ping
сообщение получено, зайдите в журнал сведения о сообщении и ответите отправителю с новым Pong
сообщением, которое впоследствии обрабатывается в отправителе PongHandler
.
Примечание.
В конфигурации Sender указано, что сообщения Ping
должны направляться Receiver. NServiceBus добавляет метаданные в сообщения, указывающие, помимо прочего, происхождение сообщения. Именно поэтому вам не нужно указывать данные маршрутизации для ответного сообщения Pong
. Оно автоматически перенаправляется обратно к своему источнику — Sender.
Теперь, когда Sender и Receiver настроены правильно, можно запустить решение.
Запуск решения
Чтобы запустить решение, необходимо запустить Sender и Receiver. Если вы используете Visual Studio Code, запустите конфигурацию "Отладить все". Если вы используете Visual Studio, настройте решение для запуска проектов Sender и Receiver:
- В Обозревателе решений щелкните решение правой кнопкой мыши.
- Выберите "Назначить запускаемые проекты".
- Выберите Несколько запускаемых проектов.
- В раскрывающемся списке выберите "Запуск" для Sender и Receiver.
Запустите решение. Отобразятся два консольных приложения: для Sender и Receiver.
В Sender обратите внимание, что сообщение Ping
отправляется каждую секунду благодаря фоновому заданию SenderWorker
. Receiver отображает сведения о каждом полученном сообщении Ping
, а Sender регистрирует сведения о каждом сообщении Pong
, которое приходят в ответ.
Теперь, когда все работает, давайте попробуем что-то сломать.
Устойчивость в работе
Ошибки в программных системах — это неизбежно. Код может завершиться ошибкой по различным причинам, например из-за сбоя сети, блокировки базы данных, изменения в сторонних API и просто из-за ошибки в коде.
NServiceBus обладает надежными функциями восстановления для обработки сбоев. При сбое обработчика сообщений сообщения автоматически извлекаются на основе предопределенной политики. Есть два типа политики повторов: немедленные и отложенные попытки повтора. Лучший способ понять их работу — увидеть их в действии. Добавим политику повтора в конечную точку Receiver:
- Откройте
Program.cs
в проекте Sender. - После строки
.EnableInstallers
добавьте следующие код:
endpointConfiguration.SendFailedMessagesTo("error");
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(
immediate =>
{
immediate.NumberOfRetries(3);
});
recoverability.Delayed(
delayed =>
{
delayed.NumberOfRetries(2);
delayed.TimeIncrease(TimeSpan.FromSeconds(5));
});
Прежде чем обсуждать, как работает эта политика, давайте посмотрим на нее в действии. Перед тестированием политики восстановления необходимо симитировать ошибку. Откройте код PingHandler
в проекте Receiver и раскомментируйте следующую строку:
throw new Exception("BOOM");
Теперь при обработке сообщения Ping
в работе Receiver будет происходить сбой. Запустите решение еще раз и давайте посмотрим, что происходит в Receiver.
При менее надежном обработчике PingHandler
при получении любых сообщений будет происходить сбой. Вы можете увидеть, как для этих сообщений запускается политика повтора. При первом сбое сообщения немедленно выполняются максимум три повторные попытки:
Разумеется, повторы будут завершаться сбоем, поэтому после выполнения трех немедленных попыток повтора включается политика отложенных повторов, и обработка сообщения задерживается на пять секунд:
По истечении пяти секунд выполняется еще три попытки обработать сообщение (т. е. еще одна итерация политики незамедлительного повтора). Это также приведет к сбою, и NServiceBus снова задержит сообщение, на этот раз на 10 секунд, прежде чем повторить попытку.
Если PingHandler
не сможет обработать сообщение после выполнения полной политики повтора, оно будет помещено в централизованную очередь ошибок с именем error
, согласно определению вызова к SendFailedMessagesTo
.
Концепция централизованной очереди ошибок отличается от механизма очереди недоставленных сообщений в Служебной шине Azure с очередью недоставленных сообщений для каждой очереди обработки. С помощью NServiceBus очереди недоставленных сообщений в Служебной шине Azure выступают как истинные очереди сообщений о сбое, тогда как сообщения, которые попадают в централизованную очередь ошибок, могут быть повторно обработаны позже, если это необходимо.
Политика повтора помогает устранить несколько типов ошибок, которые часто бывают временными по своей сути. Такие ошибки часто исчезают, если сообщение просто повторно обрабатывается после небольшой задержки. К примерам относятся сбои сети, блокировки баз данных и сбои сторонних API.
Как только сообщение появится в очереди ошибок, можно просмотреть сведения о нем в выбранном инструменте, а затем решить, что с ним делать. Например, с помощью ServicePulse, средства мониторинга от Particular Software, можно просмотреть сведения о сообщении и причину сбоя:
После изучения сведений можно отправить сообщение обратно в исходную очередь для обработки. Кроме того, перед этим сообщение можно изменить. Если в очереди ошибок есть несколько сообщений, которые по одной причине завершились сбоем, их все можно отправить обратно в исходные назначения как пакет.
Теперь пора выяснить, где развернуть наше решение в Azure.
Размещение служб в Azure
В этом примере конечные точки Sender и Receiver настраиваются для запуска в качестве консольных приложений. Они также могут размещаться в различных службах Azure, включая Функции Azure, Службы приложений Azure, Экземпляры контейнеров Azure, Службы Azure Kubernetes и виртуальные машины Azure. Например, вот как можно настроить конечную точку Sender для запуска в качестве функции Azure:
[assembly: NServiceBusTriggerFunction("Sender")]
public class Program
{
public static async Task Main()
{
var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.UseNServiceBus(configuration =>
{
configuration.Routing().RouteToEndpoint(typeof(Ping), "Receiver");
})
.Build();
await host.RunAsync();
}
}
Дополнительные сведения об использовании NServiceBus с Функциями см. в статье Использование Функций Azure со Служебной шиной Azure в документации по NServiceBus.
Следующие шаги
Дополнительные сведения об использовании NServiceBus в службах Azure см. в следующих статьях: