Клиентская библиотека служебной шины Azure для JavaScript версии 7.9.5
Служебная шина Azure — это высоконадежная облачная служба обмена сообщениями от майкрософт.
Использование клиентской библиотеки @azure/service-bus
в приложении для
- Отправка сообщений в очередь или раздел служебной шины Azure
- Получение сообщений из очереди или подписки служебной шины Azure
- Создание, получение, удаление, обновление, перечисление очередей, разделов, подписок и правил в пространстве имен служебной шины Azure.
Ресурсы для @azure/service-bus
версии 7:
Основные ссылки:
- Исходный код
- Пакет (npm)
- Справочная документация по API
- Документация по продукту
- Примеры
- Руководство по устранению неполадок
Примечание. Если вы используете версию 1.1.10 или более раннюю и хотите перейти на последнюю версию этого пакета, ознакомьтесь с нашим руководством по переходу со служебной шины версии 1 на служебную шину версии 7.
Начало работы
Установка пакета
Установите последнюю версию для клиентской библиотеки служебной шины Azure с помощью npm.
npm install @azure/service-bus
Поддерживаемые в настоящее время среды
Предварительные требования
Настройка TypeScript
Для пользователей TypeScript необходимо установить определения типов Node:
npm install @types/node
Также необходимо включить compilerOptions.allowSyntheticDefaultImports
в tsconfig.json. Обратите внимание, что если вы включили compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
параметр включен по умолчанию. Дополнительные сведения см. в руководстве по параметрам компилятора TypeScript .
Пакет JavaScript
Чтобы использовать эту клиентную библиотеку в браузере, сначала необходимо использовать средство пакетной установки. Дополнительные сведения о том, как это сделать, см. в документации по объединениям.
В дополнение к описанному в этой библиотеке также требуются дополнительные полизаполнения для следующих встроенных модулей NodeJS для правильной работы в браузерах:
buffer
os
path
process
Объединение с помощью Webpack
Если вы используете Webpack версии 5, можно установить следующие зависимости разработки.
npm install --save-dev os-browserify path-browserify
затем добавьте следующий код в webpack.config.js
const path = require("path");
+const webpack = require("webpack");
module.exports = {
entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
},
],
},
+ plugins: [
+ new webpack.ProvidePlugin({
+ process: "process/browser",
+ }),
+ new webpack.ProvidePlugin({
+ Buffer: ["buffer", "Buffer"],
+ }),
+ ],
resolve: {
extensions: [".ts", ".js"],
+ fallback: {
+ buffer: require.resolve("buffer/"),
+ os: require.resolve("os-browserify"),
+ path: require.resolve("path-browserify"),
+ },
},
Объединение с помощью свертки
Если вы используете накопительный пакет, установите следующие зависимости разработки.
npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve
Затем включите в rollup.config.js следующее.
+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";
export default {
// other configs
plugins: [
+ shim({
+ fs: `export default {}`,
+ net: `export default {}`,
+ tls: `export default {}`,
+ path: `export default {}`,
+ dns: `export function resolve() { }`,
+ }),
+ nodeResolve({
+ mainFields: ["module", "browser"],
+ preferBuiltins: false,
+ }),
+ cjs(),
+ inject({
+ modules: {
+ Buffer: ["buffer", "Buffer"],
+ process: "process",
+ },
+ exclude: ["./**/package.json"],
+ }),
]
};
Дополнительные сведения об использовании polyfills см. в документации вашего любимого упакователя.
Поддержка React Native
Как и браузеры, React Native не поддерживает некоторые API JavaScript, используемые этой библиотекой SDK, поэтому вам нужно предоставить для них полизаполнения. Дополнительные сведения см. в примере Messaging React Native с expo .
Аутентификация клиента
Взаимодействие со служебной шиной начинается с экземпляра класса ServiceBusClient . Вы можете пройти проверку подлинности в служебной шине с помощью строки подключения или учетных данных Azure Active Directory.
Использование строки подключения
Этот метод принимает строку подключения к экземпляру служебной шины. Вы можете получить строку подключения с портала Azure.
const { ServiceBusClient } = require("@azure/service-bus");
const serviceBusClient = new ServiceBusClient("<connectionString>");
Дополнительные сведения об этом конструкторе см. в документации по API.
Использование учетных данных Azure Active Directory
При проверке подлинности с помощью Azure Active Directory используется библиотека удостоверений Azure.
В приведенном ниже примере используется DefaultAzureCredential, один из многих доступных поставщиков учетных данных из библиотеки @azure/identity
.
const { ServiceBusClient } = require("@azure/service-bus");
const { DefaultAzureCredential } = require("@azure/identity");
const fullyQualifiedNamespace = "<name-of-service-bus-namespace>.servicebus.windows.net";
const credential = new DefaultAzureCredential();
const serviceBusClient = new ServiceBusClient(fullyQualifiedNamespace, credential);
Примечание. Если вы используете собственную реализацию
TokenCredential
интерфейса для AAD, задайте для служебной шины следующие области, чтобы получить соответствующий маркер:
["https://servicebus.azure.net//user_impersonation"];
Дополнительные сведения об этом конструкторе см. в документации по API.
Основные понятия
После инициализации ServiceBusClient
можно взаимодействовать со следующими ресурсами в пространстве имен служебной шины:
- Очереди: позволяет отправлять и получать сообщения. Часто используется для обмена данными типа "точка — точка".
- Темы. В отличие от очередей, темы лучше подходят для сценариев публикации и подписки. Раздел можно отправить, но для использования из требуется подписка, из которой может быть несколько параллельных.
- Подписки: механизм использования из раздела. Каждая подписка является независимой и получает копию каждого сообщения, отправленного в раздел. Правила и фильтры можно использовать для настройки сообщений, получаемых определенной подпиской.
Дополнительные сведения об этих ресурсах см. в статье Что такое служебная шина Azure?
Для взаимодействия с этими ресурсами необходимо ознакомиться со следующими понятиями пакета SDK:
- Отправка сообщений в очередь или раздел с помощью , созданного
ServiceBusSender
с помощьюServiceBusClient.createSender()
. - Получение сообщений из очереди или подписки с помощью , созданного
ServiceBusReceiver
с помощьюServiceBusClient.createReceiver()
. - Получение сообщений из очередей или подписок с поддержкой сеанса с помощью , созданного
ServiceBusSessionReceiver
с помощьюServiceBusClient.acceptSession()
илиServiceBusClient.acceptNextSession()
.
Обратите внимание, что перед использованием этой библиотеки необходимо создать очереди, разделы и подписки.
Примеры
В следующих разделах содержатся фрагменты кода, которые охватывают некоторые распространенные задачи, связанные со служебной шиной Azure.
- Отправка сообщений
- Получение сообщений
- Урегулировать сообщение
- Очереди недоставленных сообщений
- Отправка сообщений с помощью сеансов
- Получение сообщений из сеансов
- Управление ресурсами пространства имен служебной шины
- Дополнительные примеры
Отправка сообщений
После создания экземпляра ServiceBusClient
класса можно получить ServiceBusSender
с помощью метода createSender , который можно использовать для отправки сообщений.
const sender = serviceBusClient.createSender("my-queue");
const messages = [
{ body: "Albert Einstein" },
{ body: "Werner Heisenberg" },
{ body: "Marie Curie" },
{ body: "Steven Hawking" },
{ body: "Isaac Newton" },
{ body: "Niels Bohr" },
{ body: "Michael Faraday" },
{ body: "Galileo Galilei" },
{ body: "Johannes Kepler" },
{ body: "Nikolaus Kopernikus" }
];
// sending a single message
await sender.sendMessages(messages[0]);
// sending multiple messages in a single call
// this will fail if the messages cannot fit in a batch
await sender.sendMessages(messages);
// Sends multiple messages using one or more ServiceBusMessageBatch objects as required
let batch = await sender.createMessageBatch();
for (let i = 0; i < messages.length; i++) {
const message = messages[i];
if (!batch.tryAddMessage(message)) {
// Send the current batch as it is full and create a new one
await sender.sendMessages(batch);
batch = await sender.createMessageBatch();
if (!batch.tryAddMessage(messages[i])) {
throw new Error("Message too big to fit in a batch");
}
}
}
// Send the batch
await sender.sendMessages(batch);
Получение сообщений
После создания экземпляра ServiceBusClient
класса можно получить ServiceBusReceiver
с помощью метода createReceiver .
const receiver = serviceBusClient.createReceiver("my-queue");
Доступно два receiveMode
элемента.
- "peekLock" — в режиме peekLock получатель блокирует сообщение на время, указанное в очереди.
- "receiveAndDelete" — в режиме receiveAndDelete сообщения удаляются из служебной шины по мере их получения.
Если receiveMode не указан в параметрах, по умолчанию используется режим "peekLock". Вы также можете урегулировать полученные сообщения в режиме "peekLock".
Этот получатель можно использовать одним из трех способов получения сообщений:
Получение массива сообщений
Используйте функцию receiveMessages , которая возвращает обещание, которое разрешается в массив сообщений.
const myMessages = await receiver.receiveMessages(10);
Подписка с помощью обработчика сообщений
Используйте метод подписки , чтобы настроить обработчики сообщений и обеспечить их работу столько, сколько вам нужно.
Когда все будет готово, вызовите receiver.close()
, чтобы прекратить получение дополнительных сообщений.
const myMessageHandler = async (message) => {
// your code here
console.log(`message.body: ${message.body}`);
};
const myErrorHandler = async (args) => {
console.log(
`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `,
args.error
);
};
receiver.subscribe({
processMessage: myMessageHandler,
processError: myErrorHandler
});
Использование асинхронного итератора
Использование getMessageIterator для получения асинхронного итератора по сообщениям
for await (let message of receiver.getMessageIterator()) {
// your code here
}
Урегулировать сообщение
Получив сообщение, вы можете вызвать completeMessage()
, abandonMessage()
deferMessage()
или deadLetterMessage()
получателю в зависимости от того, как вы хотите его урегулировать.
Дополнительные сведения см. в статье Об урегулировании полученных сообщений.
Очереди недоставленных сообщений
Очередь недоставленных сообщений — это вложенная очередь. Каждая очередь или подписка имеет собственную очередь недоставленных сообщений. В очередях недоставленных сообщений хранятся сообщения, которые были явно недоставленных сообщений (с помощью receiver.deadLetterMessage()
), или сообщения, которые превысили максимальное число доставок.
Создание получателя для вложенной очереди недоставленных сообщений аналогично созданию получателя для подписки или очереди:
// To receive from a queue's dead letter sub-queue
const deadLetterReceiverForQueue = serviceBusClient.createReceiver("queue", {
subQueueType: "deadLetter"
});
// To receive from a subscription's dead letter sub-queue
const deadLetterReceiverForSubscription = serviceBusClient.createReceiver("topic", "subscription", {
subQueueType: "deadLetter"
});
// Dead letter receivers work like any other receiver connected to a queue
// ex:
const messages = await deadLetterReceiverForQueue.receiveMessages(5);
for (const message of messages) {
console.log(`Dead lettered message: ${message.body}`);
}
Полные примеры, демонстрирующие очереди недоставленных сообщений:
- Использование receiver.deadLetterMessage() для явной отправки сообщений во вложенную очередь недоставленных сообщений
- Получение сообщений из вложенной очереди недоставленных сообщений
Отправка сообщений с помощью сеансов
Для использования сеансов необходимо создать очередь или подписку с поддержкой сеанса. Дополнительные сведения о настройке этой функции см. на портале здесь.
Чтобы отправлять сообщения в сеанс, используйте ServiceBusClient
для создания отправителя с помощью createSender.
При отправке сообщения задайте sessionId
свойство в сообщении, чтобы убедиться, что сообщение находится в правильном сеансе.
const sender = serviceBusClient.createSender("my-session-queue");
await sender.sendMessages({
body: "my-message-body",
sessionId: "my-session"
});
Дополнительные сведения о работе сеансов см. здесь.
Получение сообщений из сеансов
Для использования сеансов необходимо создать очередь или подписку с поддержкой сеанса. Дополнительные сведения о настройке этой функции см. на портале здесь.
В отличие от очередей или подписок, не поддерживающих сеанс, только один получатель может считывать данные из сеанса в любое время. Это реализуется путем блокировки сеанса, который обрабатывается служебной шиной. По сути, это похоже на работу блокировки сообщений при использовании peekLock
режима — когда сообщение (или сеанс) заблокировано, получатель имеет к нему монопольный доступ.
Чтобы открыть и заблокировать сеанс, используйте экземпляр ServiceBusClient
для создания SessionReceiver.
Существует два способа выбора сеанса для открытия.
sessionId
Укажите , который блокирует именованный сеанс.const receiver = await serviceBusClient.acceptSession("my-session-queue", "my-session");
Не указывайте идентификатор сеанса. В этом случае служебная шина найдет следующий доступный сеанс, который еще не заблокирован.
const receiver = await serviceBusClient.acceptNextSession("my-session-queue");
Имя сеанса можно найти с помощью
sessionId
свойства вSessionReceiver
. Если receiveMode не указан в параметрах, по умолчанию используется режим "peekLock". Вы также можете урегулировать полученные сообщения в режиме "peekLock".
После создания получателя вы можете выбрать один из трех способов получения сообщений:
- Получение массива сообщений
- Подписка с помощью обработчика сообщений
- Использование асинхронного итератора
Дополнительные сведения о работе сеансов см. здесь.
Управление ресурсами пространства имен служебной шины
ServiceBusAdministrationClient
позволяет управлять пространством имен с помощью операций CRUD для сущностей (очередей, разделов и подписок) и правил подписки.
- Поддерживает проверку подлинности со строкой подключения служебной шины, а также учетными данными AAD из
@azure/identity
аналогичногоServiceBusClient
.
Примечание. Служебная шина пока не поддерживает настройку правил CORS для пространств имен, поэтому ServiceBusAdministrationClient
не будет работать в браузере без отключения веб-безопасности. Дополнительные сведения см. здесь.
// Get the connection string from the portal
// OR
// use the token credential overload, provide the host name of your Service Bus instance and the AAD credentials from the @azure/identity library
const serviceBusAdministrationClient = new ServiceBusAdministrationClient("<connectionString>");
// Similarly, you can create topics and subscriptions as well.
const createQueueResponse = await serviceBusAdministrationClient.createQueue(queueName);
console.log("Created queue with name - ", createQueueResponse.name);
const queueRuntimeProperties = await serviceBusAdministrationClient.getQueueRuntimeProperties(
queueName
);
console.log("Number of messages in the queue = ", queueRuntimeProperties.totalMessageCount);
await serviceBusAdministrationClient.deleteQueue(queueName);
- Пример для справки — administrationClient.ts
Устранение неполадок
Ниже приведены некоторые начальные шаги, чтобы начать диагностику проблем. Дополнительные сведения см. в руководстве по устранению неполадок служебной шины.
Зависимости AMQP
Библиотека служебной шины зависит от библиотеки rhea-promise для управления подключениями, отправки и получения сообщений по протоколу AMQP .
Ведение журнала
При использовании этой библиотеки с помощью приведенной ниже переменной среды можно получить журналы отладки.
- Получение журналов отладки из пакета SDK служебной шины
export DEBUG=azure*
- Получение журналов отладки из пакета SDK служебной шины и библиотеки уровня протокола.
export DEBUG=azure*,rhea*
- Если вы не заинтересованы в просмотре преобразования сообщений (которое занимает много места на консоли или диске), можно задать
DEBUG
переменную среды следующим образом:
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message,-azure:core-amqp:datatransformer
- Если вас интересуют только ошибки, можно задать
DEBUG
переменную среды следующим образом:
export DEBUG=azure:service-bus:error,azure:core-amqp:error,rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow
Ведение журнала в файле
DEBUG
Задайте переменную среды, как показано выше- Запустите тестовый скрипт следующим образом:
- Инструкции ведения журнала из тестового скрипта — в
out.log
, а инструкции ведения журнала из пакета SDK — вdebug.log
.node your-test-script.js > out.log 2>debug.log
- Инструкции ведения журнала из тестового скрипта и пакета SDK переходят в тот же файл
out.log
, перенаправляя stderr в stdout (&1), а затем перенаправляя stdout в файл:node your-test-script.js >out.log 2>&1
- Инструкции ведения журнала из тестового скрипта и пакета SDK переходят в один и тот же файл
out.log
.node your-test-script.js &> out.log
Дальнейшие действия
Подробные примеры использования этой библиотеки для отправки и получения сообщений в очереди, разделы и подписки служебной шины и из нее см. в каталоге примеров.
Участие
Если вы хотите вносить изменения в эту библиотеку, ознакомьтесь с руководством по внесению изменений, в котором содержатся сведения о создании и тестировании кода.
Azure SDK for JavaScript