Руководство по использованию протокола AMQP 1.0 в служебной шине и Центрах событий Azure
Улучшенный протокол очередей сообщений (AMQP) 1.0 — это стандартный протокол кадрирования и передачи, который обеспечивает асинхронную безопасную и надежную передачу сообщений между двумя сторонами. Это основной протокол, применяемый для обмена сообщениями служебной шины Azure и Центров событий Azure.
Протокол AMQP 1.0 — это результат совместной работы в разных отраслях, которая объединила таких производителей ПО промежуточного слоя, как корпорация Майкрософт и Red Hat, с большим количеством пользователей ПО промежуточного слоя для обмена сообщениями, например компанией JP Morgan Chase, представляющей собой индустрию финансовых услуг. На форуме OASIS по технической стандартизации официально утверждено, что спецификации протокола и расширения AMQP соответствуют международному стандарту ISO/IEC 19494:2014.
Цели
В этой статье кратко представлены основные понятия спецификации обмена сообщениями по протоколу AMQP 1.0, а также спецификации расширений, разработанные техническим комитетом OASIS по AMQP. Здесь также объясняется, как Служебная шина Azure реализует и использует эти спецификации.
Цель заключается в том, чтобы все разработчики, использующие любой имеющийся стек клиента AMQP 1.0 на любой платформе, могли взаимодействовать со служебной шиной Azure через протокол AMQP 1.0.
Распространенные стеки общего назначения AMQP 1.0, такие как Apache Qpid Proton или AMQP.NET Lite, реализуют все основные элементы протокола AMQP 1.0, такие как сеансы или ссылки. Для этих базовых элементов иногда в качестве оболочки используется API более высокого уровня. В Apache Proton предусмотрены два API: императивный API Messenger и реактивный API Reactor.
Далее предполагается, что управление подключениями, сеансами и связями AMQP, а также обработка передачи кадров и управление потоком осуществляются с помощью соответствующего стека (например, Apache Proton-C) и для этого не требуется особое внимание со стороны разработчиков приложений. Кроме того, абстрактно предполагается, что доступны несколько примитивных возможностей API, например возможность подключения и создания абстрактных объектов отправителя (sender) и получателя (receiver), которые затем приобретают форму операций send()
и receive()
соответственно.
Расширенные возможности служебной шины Azure, например просмотр сообщений или управление сеансами, описываются в контексте AMQP, а также как многоуровневая псевдореализация на базе этой предполагаемой абстракции API.
Что такое AMQP?
AMQP — это протокол передачи и кадрирования. Под кадрированием подразумевается предоставление структуры для потоков двоичных данных, которые передаются в любом направлении в рамках сетевого подключения. Структура разграничивает отдельные блоки данных, именуемые кадрами, которые передаются между сторонами подключения. Возможности передачи предусматривают, что обе взаимодействующие стороны могут узнать время передачи и завершения передачи кадров.
В отличие от более ранних версий черновых версий из рабочей группы AMQP, которые по-прежнему используются несколькими брокерами сообщений, окончательным протоколом рабочей группы и стандартизированным протоколом AMQP 1.0 не предписывает наличие брокера сообщений или какой-либо конкретной топологии для сущностей внутри брокера сообщений.
Протокол можно использовать для симметричного однорангового обмена данными и взаимодействия с брокерами сообщений, поддерживающими очереди и объекты публикаций и подписок, например со служебной шиной Azure. Его можно также использовать для взаимодействия с инфраструктурой обмена сообщениями, в которой шаблоны взаимодействия отличаются от обычных очередей, как в случае с Центрами событий Azure. При получении событий концентратор событий выполняет роль очереди, но когда события считываются из него, он действует скорее как служба последовательного хранения. Это отчасти напоминает ленточный накопитель. Клиент выбирает смещение в доступном потоке данных, а затем обрабатывает все события, начиная с этого смещения до последнего доступного.
Протокол AMQP 1.0 предусматривает возможность расширения, то есть его возможности можно дополнить с помощью новых спецификаций. Три спецификации расширения, описанные в этом документе, иллюстрируют его. Для обмена данными по существующей инфраструктуре HTTPS/WebSockets настройка собственных TCP-портов AMQP может оказаться сложной. Спецификация привязки определяет способ наложения протокола AMQP поверх инфраструктуры WebSockets. Чтобы обеспечить взаимодействие с инфраструктурой обмена сообщениями в режиме "запрос — ответ" для управления или предоставления дополнительных функций, спецификация управления AMQP определяет основные примитивные возможности взаимодействия. Для интеграции модели федеративной авторизации спецификация AMQP с безопасностью на основе утверждений определяет способ связывания и обновления маркеров авторизации, сопоставленных со связями.
Основные сценарии использования протокола AMQP
В этом разделе описываются основные сценарии использования протокола AMQP 1.0 со служебной шиной Azure, в частности создание подключений, сеансов и связей, а также передача сообщений в сущности служебной шины (очереди, разделы, подписки) и из них.
Самый авторитетный источник сведений о принципах работы протокола AMQP — спецификация AMQP 1.0. Но эта спецификация представляет собой руководство по реализации, а не по использованию протокола. В этом разделе рассматриваются все термины, необходимые для описания использования протокола AMQP 1.0 в служебной шине. Дополнительные сведения об AMQP и более широком обсуждении AMQP 1.0 см . в этом видеокурсе.
Подключения и сеансы
AMQP вызывает контейнеры взаимодействующих программ. В них содержатся узлы, которые являются взаимодействующими объектами внутри этих контейнеров. Таким узлом может выступать очередь. Протокол AMQP предусматривает мультиплексирование, что позволяет использовать одно подключение для многих путей передачи данных между узлами. Например, клиент приложения может одновременно получать данные из одной очереди и отправлять их в другую очередь через одно и то же сетевое подключение.
Таким образом, сетевое подключение привязывается к контейнеру. Его инициирует контейнер в роли клиента. При этом устанавливается исходящее подключение TCP через сокет к контейнеру в роли получателя, который ожидает передачи данных и принимает входящие подключения TCP. Подтверждение подключения включает согласование версии протокола, объявление или согласование использования протокола безопасности уровня транспорта (TLS)/SSL- и подтверждения проверки подлинности и авторизации в области подключения, основанной на SASL.
Для служебной шины Azure или Центров событий Azure обязательно использование TLS. Он поддерживает подключения через TCP-порт 5671, когда перед вводом подтверждения протокола AMQP на подключение TCP сначала накладывается TLS, а также подключения через TCP-порт 5672, когда сервер незамедлительно предлагает обязательные обновления подключения к TLS с использованием модели, предписанной AMQP. Привязка AMQP WebSockets создает туннель через TCP-порт 443, что эквивалентно подключениям по протоколу AMQP 5671.
После настройки подключения и TLS в служебной шине применяется один из двух вариантов механизма SASL.
- SASL PLAIN, как правило, используется для передачи имени пользователя и пароля на сервер. В служебной шине нет учетных записей, но есть связанные с ключом именованные правила безопасности общего доступа, которые присваивают права. Имя правила используется в качестве имени пользователя, а ключ (текст в кодировке Base64) — в качестве пароля. Права, связанные с выбранным правилом, определяют операции, разрешенные для подключения.
- SASL ANONYMOUS используется для обхода авторизации SASL, когда клиент хочет использовать модель безопасности на основе утверждений (CBS), которая описана далее в этой статье. Используя этот вариант, подключение клиента можно установить анонимно на короткий период времени, в течение которого он может взаимодействовать только с конечной точкой CBS. При этом необходимо выполнить подтверждение CBS.
После установки транспортного подключения каждый контейнер объявляет максимальный размер кадра, который он готов обработать, и время ожидания простоя, по истечении которого он в одностороннем порядке отключится, если подключение будет неактивно.
Они также объявляют количество поддерживаемых параллельных каналов. Канал — это однонаправленный исходящий виртуальный путь передачи на основе подключения. Сеанс принимает канал из каждого из взаимосвязанных контейнеров для формирования двустороннего пути обмена данными.
В сеансах предусмотрена модель управления потоком на основе окна. После создания сеанса каждая сторона объявляет количество кадров, которое она готова принять в окне получения. Пока стороны обмениваются кадрами, передаваемые кадры заполняют окно. После заполнения окна передача прекращается и возобновляется только после сброса или развертывания окна с помощью перформатива потока (перформатив — это термин AMQP, обозначающий жесты на уровне протокола, передаваемые между двумя сторонами).
Эта модель на основе окна является приблизительным аналогом концепции TCP управления потоком на основе окна, но на уровне сеанса в сокете. Благодаря поддержке нескольких параллельных сеансов в рамках протокола регулируемый обычный трафик может сменяться трафиком с высоким приоритетом, как на полосе дорожного движения.
Сейчас в служебной шине Azure для каждого подключения используется только один сеанс. Максимальный размер кадра Служебной шины для ценовой категории "Стандартный" — 262 144 байта (256 КБ). Максимальный размер кадра Служебной шины для ценовой категории “Премиум” и Центров событий — 1 048 576 (100 МБ). В служебной шине не установлены определенные окна регулирования на уровне сеанса, но предусмотрен регулярный сброс окна в рамках управления потоком на уровне связи (см. следующий раздел).
Подключения, каналы и сеансы являются временными. Если базовое подключение прерывается, подключения, туннель TLS, контекст авторизации SASL и сеансы необходимо установить заново.
Обязательные порты для исходящего трафика AMQP
Клиенты, использующие подключения AMQP по протоколу TCP, должны открыть порты 5671 и 5672 в локальном брандмауэре. Вместе с этими портами может потребоваться открыть дополнительные порты, если включен компонент EnableLinkRedirect. EnableLinkRedirect
— это новая функция обмена сообщениями, которая позволяет пропустить один прыжок при получении сообщений, что способствует повышению пропускной способности. Клиент начинает взаимодействовать напрямую с внутренней службой через диапазон портов 104XX, как показано на следующем рисунке.
Клиент .NET не сможет работать с SocketException ("Сделана попытка доступа к сокету методом, запрещенным правами доступа"), если эти порты заблокированы брандмауэром. Эту функцию можно отключить, указав EnableAmqpLinkRedirect=false
в строке подключения, в результате чего клиенты будут взаимодействовать с удаленной службой через порт 5671.
Привязка AMQP WebSocket предоставляет механизм туннелирования подключения AMQP через транспорт WebSocket. Эта привязка создает туннель через TCP-порт 443, который эквивалентен подключениям AMQP 5671. Используйте AMQP WebSockets, если вы находитесь за брандмауэром, который блокирует TCP-подключения через порты 5671, 5672, но разрешает TCP-подключения через порт 443 (https).
Ссылки.
AMQP обеспечивает передачу сообщений через связи. Связь — это созданный в рамках сеанса путь передачи, который позволяет передавать сообщения в одном направлении. Согласование состояния передачи выполняется через связь и является двусторонним между сторонами подключения.
Связи могут создаваться в контейнерах в любое время в рамках активного сеанса, что отличает протокол AMQP от многих других протоколов, включая HTTP и MQTT, где инициирование передачи и путь передачи является привилегией стороны, которая создает подключение через сокет.
Контейнер, инициирующий связь, отправляет в противоположный контейнер запрос на принятие связи и выбирает роль отправителя или получателя. Таким образом, контейнер может инициировать создание односторонних или двусторонних путей передачи, которые моделируются в виде пары связей.
У связей есть имена, и они сопоставлены с узлами. Как уже говорилось в начале, узлы являются взаимодействующими объектами внутри контейнера.
В служебной шине понятие узла напрямую соответствует очереди, разделу, подписке или подочереди недоставленных сообщений в очереди или подписке. Имя узла, используемое в AMQP, — это относительное имя объекта в пространстве имен служебной шины. Имя очереди (например, myqueue
) является также именем узла AMQP этой очереди. Подписка на раздел соответствует условиям соглашения HTTP API, так как она относится к коллекции ресурсов "подписки". Следовательно, для подписки sub из раздела mytopic имя узла AMQP будет иметь значение mytopic/subscriptions/sub.
Для создания связей подключающийся клиент должен использовать имя локального узла. Служебная шина не предписывает использование этих имен узлов и не интерпретирует их. В стеках клиента AMQP 1.0, как правило, используется схема, в соответствии с которой эти временные имена узлов являются уникальными в области клиента.
Перемещения
Как только связь будет установлена, через нее можно передавать сообщения. В AMQP передача выполняется с помощью жеста явного протокола (перформатив передачи), который перемещает сообщение отправителя к получателю через связь. Передача завершается после "сопоставления", то есть когда обе стороны узнали результат этой передачи.
В самом простом случае отправитель может отправлять сообщения "предварительно сопоставленными". Это означает, что клиент не заинтересован в результатах и получатель не предоставляет отчет о результатах операции. Этот режим поддерживается служебной шиной на уровне протокола AMQP, но не представлен ни в одном из клиентских API-интерфейсов.
В обычном случае сообщения отправляются несопоставленными, а затем получатель принимает или отклоняет их с помощью перформатива обработки. Сообщения отклоняются, когда получатель не может принять их по какой-либо причине. В этом случае в сообщении об отклонении содержатся сведения о причине, которой является структура ошибки, определенная AMQP. Если сообщения отклоняются из-за внутренних ошибок в служебной шине, служба возвращает дополнительные сведения в этой структуре. Эти сведения можно предоставлять в качестве подсказок для диагностики в службу технической поддержки при заполнении запросов на поддержку. Дополнительные сведения об ошибках приведены далее.
Состояние released (освобождено) — это особая форма отклонения, которая означает, что у получателя не возникает технических проблем в связи с передачей, но он не заинтересован в ее сопоставлении. Такое бывает, например, когда сообщение доставляется клиенту служебной шины, который отклоняет это сообщение по причине того, что не может выполнить работу, возникающую в результате обработки сообщения, то есть сама по себе доставка сообщения выполняется без ошибок. Вариант этого состояния — состояние modified (изменено), которое позволяет вносить изменения в сообщение, когда оно освобождается. Это состояние сейчас не используется в служебной шине.
Спецификация AMQP 1.0 определяет дополнительное состояние обработки — received (получено). Это состояние особенно удобно для обработки восстановления связей. Таким образом можно восстановить состояние связи и ожидающие доставки на основе нового подключения и сеанса, если предыдущее подключение и сеанс потеряны.
служебная шина не поддерживает восстановление ссылок; если клиент теряет подключение к служебная шина с незапланированных передачи сообщений, то передача сообщений теряется, а клиент должен повторно подключиться, повторно установить ссылку и повторить передачу.
Таким образом, служебная шина и Центры событий поддерживают передачи "как минимум один раз", при которых отправитель может быть уверен, что сообщения приняты и сохранены. Но на уровне AMQP не поддерживаются передачи "ровно один раз", при которых система пытается восстановить связь и согласовать состояние доставки, чтобы не дублировать передачу сообщений.
Чтобы компенсировать возможные повторные отправки, служебная шина может выявлять повторы в очередях и разделах. Эту функцию нужно активировать дополнительно. Этот механизм фиксирует идентификаторы всех входящих сообщений за определенный пользователем период времени и автоматически удаляет все сообщения, идентификаторы которых уже встречались в течение этого периода.
Управление потоком
В дополнение к модели управления потоком на уровне сеанса, которая уже обсуждалась ранее, каждая связь предусматривает свою собственную модель управления потоком. Благодаря управлению потоком на уровне сеанса контейнеру не приходится обрабатывать слишком много кадров за раз. Благодаря управлению потоком на уровне связей приложение определяет количество поступающих через связь сообщений, которые оно может обработать, а также время обработки.
Передачи через связь могут произойти, только если у отправителя достаточно разрешений на передачу. Разрешения на передачу — это счетчик, устанавливаемый получателем с помощью перформатива потока, предусмотренного для связи. Если у отправителя есть разрешения на передачу, он будет стараться использовать все эти разрешения на отправку сообщений. Каждая доставка сообщения уменьшает доступное количество разрешений на передачу на 1. Если используется кредит связи, поставки останавливаются.
Когда служебная шина выполняет роль получателя, она незамедлительно предоставляет отправителю достаточное количество разрешений на передачу. Таким образом, сообщения можно отправлять сразу. По мере использования разрешений на передачу служебная шина периодически отправляет отправителю перформатив потока, чтобы обновить остаток разрешений на передачу.
Выполняя роль отправителя, служебная шина отправляет сообщения, чтобы использовать все оставшиеся разрешения на передачу.
Вызов получения на уровне API преобразуется в перформатив потока, который клиент отправляет в служебную шину. Служебная шина расходует эти разрешения, забирая из очереди первое доступное и не заблокированное сообщение, блокируя его и передавая получателю. Если сообщений, доступных для доставки, нет, все неиспользованные разрешения для всех связей с конкретными объектами сохраняются в порядке поступления. Пока остаются доступные разрешения, все поступающие сообщения будут сразу же блокироваться и передаваться.
Блокировка сообщения снимается, когда передача переходит в одно из конечных состояний: accepted (принято), rejected (отклонено) или released (освобождено). Сообщение удаляется из служебной шины, если передача переходит в конечное состояние accepted (принято). Оно остается в служебной шине и будет доставлено следующему получателю, когда передача перейдет в любое другое состояние. Служебная шина автоматически переместит сообщение в очередь недоставленных сообщений объекта, когда будет достигнуто максимальное число доставок, разрешенное для объекта из-за повторяющихся отклонений или освобождений.
В интерфейсах API служебной шины такая возможность сейчас не предоставляется напрямую. Несмотря на это клиент протокола AMQP нижнего уровня может использовать модель разрешений на передачу, чтобы сменить взаимодействие с извлечением, при котором выдается одно разрешение для каждого запроса на получение, на модель с отправлением, при которой выдается большое количество разрешений на передачу. Это позволяет получать сообщения сразу, как только они станут доступными, без дополнительных согласований. Push-отправка поддерживается с помощью параметров свойства ServiceBusProcessor.PrefetchCount или ServiceBusReceiver.PrefetchCount. Если их значения отличны от нуля, клиент AMQP использует их в качестве разрешений на передачу.
В этом контексте важно понимать, что срок действия блокировки сообщения внутри объекта начинается с того момента, когда сообщение извлекается из объекта, а не когда сообщение поступает в сеть. Каждый раз, когда клиент указывает о готовности к приему сообщений путем выдачи разрешений на передачу, ожидается, что он может активно получать сообщения через сеть и готов их обрабатывать. В противном случае блокировка сообщения может истекла до того, как сообщение будет доставлено. Использование управления потоком разрешений на передачу должно непосредственно отображать немедленную готовность к работе с доступными сообщениями, отправляемыми получателю.
В следующих разделах предоставляется схематический обзор потока перформатива во время взаимодействия различных API. В каждом разделе описывается отдельная логическая операция. Некоторые из этих взаимодействий могут быть "ленивыми", что означает, что они могут выполняться только при необходимости. Создание отправителя сообщения может не привести к сетевому взаимодействию до тех пор, пока первое сообщение не будет отправлено или запрошено.
Стрелки в следующей таблице указывают направление потока перформатива.
Создание получателя сообщения
Клиент | Cлужебная шина |
---|---|
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={entity name},<br/>target={client link ID}<br/>) |
Клиент подключается к объекту в качестве получателя |
Служебная шина подключает конец его связи | <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={entity name},<br/>target={client link ID}<br/>) |
Создание отправителя сообщения
Клиент | Cлужебная шина |
---|---|
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) |
Никаких действий |
Никаких действий | <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={client link ID},<br/>target={entity name}<br/>) |
Создание отправителя сообщения (ошибка)
Клиент | Cлужебная шина |
---|---|
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) |
Никаких действий |
Никаких действий | <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source=null,<br/>target=null<br/>)<br/><br/><-- detach(<br/>handle={numeric handle},<br/>closed=**true**,<br/>error={error info}<br/>) |
Закрытие получателя или отправителя сообщений
Клиент | Cлужебная шина |
---|---|
--> detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>) |
Никаких действий |
Никаких действий | <-- detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>) |
Отправка (успешно)
Клиент | Cлужебная шина |
---|---|
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
Никаких действий |
Никаких действий | <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>) |
Отправка (ошибка)
Клиент | Cлужебная шина |
---|---|
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
Никаких действий |
Никаких действий | <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**rejected**(<br/>error={error info}<br/>)<br/>) |
Получение
Клиент | Cлужебная шина |
---|---|
--> flow(<br/>link-credit=1<br/>) |
Никаких действий |
Никаких действий | < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
--> disposition(<br/>role=**receiver**,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>) |
Никаких действий |
Получение нескольких сообщений
Клиент | Cлужебная шина |
---|---|
--> flow(<br/>link-credit=3<br/>) |
Никаких действий |
Никаких действий | < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
Никаких действий | < transfer(<br/>delivery-id={numeric handle+1},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
Никаких действий | < transfer(<br/>delivery-id={numeric handle+2},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) |
--> disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID+2},<br/>settled=**true**,<br/>state=**accepted**<br/>) |
Никаких действий |
Сообщения
В следующих разделах объясняется, какие свойства из стандартных разделов сообщений AMQP использует служебная шина, и как они соотносятся с набором API служебной шины.
Любое свойство, которое требуется определить приложению, следует сопоставить с сопоставлением application-properties
AMQP.
Верхний колонтитул
Имя поля | Использование | Имя API |
---|---|---|
durable | - | - |
priority | - | - |
ttl | Срок жизни сообщения | TimeToLive |
first-acquirer | - | - |
delivery-count | - | DeliveryCount |
Свойства
Имя поля | Использование | Имя API |
---|---|---|
message-id |
Определяемый приложением идентификатор свободной формы для этого сообщения. Используется для обнаружения дубликатов. | MessageId |
user-id |
Определяемый приложением идентификатор пользователя, который не интерпретируется служебной шиной. | Недоступно через API служебной шины. |
to |
Определяемый приложением идентификатор назначения, который не интерпретируется служебной шиной. | Кому |
subject |
Определяемый приложением идентификатор назначения сообщения, который не интерпретируется служебной шиной. | Тема |
reply-to |
Определяемый приложением индикатор пути ответа, который не интерпретируется служебной шиной. | ReplyTo |
correlation-id |
Определяемый приложением идентификатор корреляции, который не интерпретируется служебной шиной. | CorrelationId |
content-type |
Определяемый приложением индикатор типа содержимого, который не интерпретируется служебной шиной. | ContentType |
content-encoding |
Определяемый приложением индикатор кодирования содержимого, который не интерпретируется служебной шиной. | Недоступно через API служебной шины. |
absolute-expiry-time |
Объявляет абсолютное время истечения срока действия сообщения. Игнорируется во входных данных (отмечается значение TTL заголовка), учитывается в выходных данных. | Недоступно через API служебной шины. |
creation-time |
Объявляет время создания сообщения. Не используется служебной шиной. | Недоступно через API служебной шины. |
group-id |
Определяемый приложением идентификатор связанного набора сообщений. Используется для сеансов служебной шины. | SessionId |
group-sequence |
Счетчик, определяющий относительный порядковый номер сообщения в сеансе. Игнорируется служебной шиной. | Недоступно через API служебной шины. |
reply-to-group-id |
- | ReplyToSessionId |
Заметки к сообщениям
Существует несколько других свойств сообщений служебной шины, которые не считаются свойствами сообщений AMQP и передаются как MessageAnnotations
в сообщении.
Ключ сопоставления заметки | Использование | Имя API |
---|---|---|
x-opt-scheduled-enqueue-time |
Объявляет, в какое время сообщение должно появиться в сущности. | ScheduledEnqueueTime |
x-opt-partition-key |
Определяемый приложением ключ, который указывает, в какой раздел должно попасть сообщение. | PartitionKey |
x-opt-via-partition-key |
Определяемое приложением значение ключа раздела в случае, когда используется транзакция для отправки сообщений через очередь передачи. | TransactionPartitionKey |
x-opt-enqueued-time |
Определяемое службой время в формате UTC, представляющее собой фактическое время постановки сообщения в очередь. Игнорируется во входных данных. | EnqueuedTime |
x-opt-sequence-number |
Определяемый службой уникальный номер, назначенный сообщению. | SequenceNumber |
x-opt-offset |
Определяемый службой порядковый номер постановки сообщения в очередь. | EnqueuedSequenceNumber |
x-opt-locked-until |
Определяется службой. Дата и время, до которых сообщение будет заблокировано в очереди или подписке. | LockedUntil |
x-opt-deadletter-source |
Определяется службой. Источник исходного сообщения, если сообщение получено из очереди недоставленных сообщений. | DeadLetterSource |
Возможность транзакций
Транзакция объединяет две или несколько операций в область выполнения. По своей природе такая транзакция должна обеспечивать либо успешное, либо неудачное выполнение всех относящихся к данной группе операций.
Операции группируются по идентификатору txn-id
.
Для транзакционного взаимодействия клиент действует как transaction controller
, контролируя операции, которые должны быть сгруппированы вместе. Служба "Служебная шина" выступает в качестве transactional resource
и выполняет задачи, которые запрашивает transaction controller
.
Клиент и служба взаимодействуют через control link
, что устанавливается клиентом. Сообщения declare
и discharge
отправляются контроллером по каналу управления для выделения и завершения транзакций соответственно (они не определяют границ для работы с транзакциями). Фактические отправки и получения не выполняются по этой ссылке. Каждая запрошенная транзакционная операция явно идентифицируется с требуемым txn-id
значением и, следовательно, может возникать на любой ссылке в соединении. Если канал управления закроется, несмотря на созданные им неразгруженные транзакции, все такие транзакции немедленно откатываются, и попытки выполнить дальнейшую работу с ними приведут к ошибке. Сообщения по каналу управления не должны предварительно согласовываться.
Каждое подключение должно инициировать собственный канал управления, чтобы получить возможность запускать и завершать транзакции. Служба определяет специальный целевой объект, который функционирует как coordinator
. Клиент или контроллер устанавливает канал управления для этого целевого объекта. Канал управления находится за границами сущности. Это значит, что один и тот же канал управления можно использовать для запуска и разгрузки транзакций для нескольких сущностей.
Запуск транзакции
Чтобы начать работу транзакций, контроллер должен получить txn-id
от координатора. Для этого он отправляет сообщения с типом declare
. Если объявление выполнено успешно, координатор отправляет в ответ результаты обработки сообщения с назначенным txn-id
.
Клиент (контроллер) | Направление | Служебная шина (координатор) |
---|---|---|
attach(<br/>name={link name},<br/>... ,<br/>role=**sender**,<br/>target=**Coordinator**<br/>) |
------> | |
<------ | attach(<br/>name={link name},<br/>... ,<br/>target=Coordinator()<br/>) |
|
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (**Declare()**)} |
------> | |
<------ | disposition( <br/> first=0, last=0, <br/>state=**Declared**(<br/>**txn-id**={transaction ID}<br/>)) |
Разгрузка транзакций
Контроллер завершает работу с транзакциями, отправляя координатору сообщение discharge
. Контроллер указывает, что он собирается зафиксировать или откатить работу с транзакциями, устанавливая флаг fail
в тексте сообщения о разгрузке. Если координатору не удается завершить разгрузку, сообщение отклоняется с результатом, который содержит transaction-error
.
Примечание. Значечние fail=true означает откат транзакции, а fail=false — фиксацию.
Клиент (контроллер) | Направление | Служебная шина (координатор) |
---|---|---|
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} |
------> | |
<------ | disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>)) |
|
. . . Работа с транзакциями на других каналах . . . |
||
transfer(<br/>delivery-id=57, ...)<br/>{ AmqpValue (<br/>**Discharge(txn-id=0,<br/>fail=false)**)} |
------> | |
<------ | disposition( <br/> first=57, last=57, <br/>state=**Accepted()**) |
Отправка сообщения в транзакции
Все операции транзакций выполняются с состоянием transactional-state
доставки транзакций, который содержит txn-id. При отправке сообщений транзакционный режим выполняется кадром передачи сообщения.
Клиент (контроллер) | Направление | Служебная шина (координатор) |
---|---|---|
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} |
------> | |
<------ | disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>)) |
|
transfer(<br/>handle=1,<br/>delivery-id=1, <br/>**state=<br/>TransactionalState(<br/>txn-id=0)**)<br/>{ payload } |
------> | |
<------ | disposition( <br/> first=1, last=1, <br/>state=**TransactionalState(<br/>txn-id=0,<br/>outcome=Accepted()**)) |
Обработка сообщения в транзакции
Обработка сообщения предусматривает такие операции, как Complete
/ Abandon
/ DeadLetter
/ Defer
. Для выполнения этих операций в рамках транзакции передайте transactional-state
с помощью перформатива обработки.
Клиент (контроллер) | Направление | Служебная шина (координатор) |
---|---|---|
transfer(<br/>delivery-id=0, ...)<br/>{ AmqpValue (Declare())} |
------> | |
<------ | disposition( <br/> first=0, last=0, <br/>state=Declared(<br/>txn-id={transaction ID}<br/>)) |
|
<------ | transfer(<br/>handle=2,<br/>delivery-id=11, <br/>state=null)<br/>{ payload } |
|
disposition( <br/> first=11, last=11, <br/>state=**TransactionalState(<br/>txn-id=0,<br/>outcome=Accepted()**)) |
------> |
Расширенные возможности служебной шины
В этом разделе рассматриваются расширенные возможности служебной шины Azure, основанные на черновых расширениях AMQP, которые сейчас разрабатываются в техническом комитете OASIS для AMQP. Служебная шина реализует последнюю версию этих черновиков и внедряет внесенные изменения, как только эти черновики переходят в состояние стандартных.
Примечание.
Дополнительные операции обмена сообщениями служебной шины поддерживаются с помощью шаблона "запрос — ответ". Дополнительные сведения об этих операциях см. в статье AMQP 1.0 в служебной шине Microsoft Azure: операции c запросами и ответами.
Управление AMQP
Спецификация управления AMQP — это первое черновое расширение, рассмотренное в этой статье. Эта спецификация определяет набор протоколов, расположенных над уровнем протокола AMQP, которые позволяют выполнять взаимодействия по управлению с инфраструктурой обмена сообщениями по протоколу AMQP. Спецификация определяет такие универсальные операции, как создание, чтение, обновление и удаление, для управления объектами в инфраструктуре обмена сообщениями, а также набор операций запросов.
Для всех этих жестов требуется взаимодействие "запрос — ответ" между клиентом и инфраструктурой обмена сообщениями. Поэтому спецификация определяет реализацию этого шаблона взаимодействия поверх AMQP: клиент подключается к инфраструктуре обмена сообщениями, инициирует сеанс, а затем создает пару связей. В одной связи клиент выступает в роли отправителя, а в другой — в качестве получателя, тем самым создавая пару связей, которая может выполнять роль двустороннего канала.
Logical Operation | Клиент | Cлужебная шина |
---|---|---|
Создание пути "запрос-ответ" | --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=**null**,<br/>target=”myentity/$management”<br/>) |
Никаких действий |
Создание пути "запрос-ответ" | Никаких действий | \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=null,<br/>target=”myentity”<br/>) |
Создание пути "запрос-ответ" | --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=”myentity/$management”,<br/>target=”myclient$id”<br/>) |
|
Создание пути "запрос-ответ" | Никаких действий | \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=”myentity”,<br/>target=”myclient$id”<br/>) |
При наличии этой пары связей реализация модели "запрос — ответ" выполняется очень просто: запросом считается сообщение, отправленное в объект в инфраструктуре обмена сообщениями, которая поддерживает эту модель. В этом запросе-сообщении для поля reply-to в разделе properties установлен целевой идентификатор для связи, через которую отправляется ответ. Объект обработки обрабатывает запрос и предоставляет ответ через связь, целевой идентификатор которой соответствует указанному идентификатору в поле reply-to.
Для шаблона требуется, чтобы контейнер клиента и созданный клиентом идентификатор для назначения ответа были уникальными среди всех клиентов. Кроме того, по соображениям безопасности его должно быть сложно предугадать.
Обмен сообщениями, используемый для протокола управления и для всех других протоколов, использующих тот же шаблон, происходит на уровне приложения; Они не определяют новые жесты уровня протокола AMQP. Это сделано намеренно, чтобы приложения могли сразу применять эти расширения с совместимыми стеками AMQP 1.0.
служебная шина в настоящее время не реализует ни одну из основных функций спецификации управления, но шаблон запроса и ответа, определенный спецификацией управления, является основой для функции безопасности на основе утверждений и почти для всех расширенных возможностей, описанных в следующих разделах:
Авторизация на основе утверждений
Черновик спецификации для авторизации на основе утверждений (CBS) в AMQP основан на шаблоне "запрос — ответ" спецификации управления и описывает обобщенную модель использования маркеров федеративной безопасности с протоколом AMQP.
Модель безопасности AMQP по умолчанию, описанная во введении, основана на SASL и интегрируется при помощи подтверждения подключения AMQP. При использовании SASL предоставляется расширяемая модель, для которой определен набор механизмов. Любой протокол, в котором используется SASL, может использовать этот набор. К этим механизмам относятся следующие: PLAIN для передачи имен пользователей и паролей, EXTERNAL для привязки к безопасности на уровне TLS, ANONYMOUS для выражения отсутствия явной проверки подлинности или авторизации и широкий набор дополнительных механизмов, позволяющих передавать учетные данные или маркеры для проверки подлинности и (или) авторизации.
Интеграция SASL в протоколе AMQP имеет два недостатка:
- Все учетные данные и маркеры ограничены подключением. Инфраструктура обмена сообщениями может потребоваться обеспечить дифференцированный контроль доступа на основе каждой сущности; Например, разрешая носителю маркера отправляться в очередь A, но не в очередь B. При привязке контекста авторизации к подключению невозможно использовать одно подключение и использовать разные маркеры доступа для очереди A и очереди B.
- Как правило, маркеры доступа действительны в течение ограниченного периода времени. Пользователю приходится периодически повторно получать маркеры. Это позволяет поставщику маркеров отклонить выдачу нового маркера, если изменятся разрешения пользователя на доступ. Подключения AMQP могут длиться долгое время. Модель SASL предоставляет только возможность задать маркер во время подключения, что означает, что инфраструктура обмена сообщениями должна либо отключить клиент, когда срок действия маркера истекает, либо он должен принять риск продолжения обмена данными с клиентом, который имеет права доступа, возможно, был отозван в промежуточном режиме.
Спецификация CBS AMQP, реализуемая служебной шиной, элегантно обходит обе эти проблемы: клиент может связать маркеры доступа с каждым узлом и обновить их до истечения срока действия, не прерывая поток сообщений.
CBS определяет виртуальный узел управления $cbs, который должна предоставить инфраструктура обмена сообщениями. Узел управления принимает маркеры от имени других узлов в инфраструктуре обмена сообщениями.
Жест протокола — это обмен данными по типу "запрос-ответ", как определено в спецификации управления. Это означает, что клиент устанавливает пару связей с узлом $cbs, а затем передает запрос в исходящую связь, после чего ожидает ответа во входящей связи.
Сообщение запроса обладает следующими свойствами приложения.
Ключ | Необязательно | Тип значения | Содержимое значения |
---|---|---|---|
operation |
Нет | строка | put-token |
type |
Нет | строка | Тип размещаемого маркера. |
name |
Нет | строка | "Аудитория", к которой относится маркер. |
expiration |
Да | TIMESTAMP | Время окончания срока действия маркера. |
Свойство name определяет объект, с которым необходимо связать маркер. В служебной шине это путь к очереди, разделу или подписке. Свойство type определяет тип маркера:
Тип маркера | Описание маркера | Тип текста | Примечания. |
---|---|---|---|
jwt |
JSON Web Token (JWT) | Значение AMQP (строка) | |
servicebus.windows.net:sastoken |
Маркер SAS служебной шины | Значение AMQP (строка) | - |
Маркеры предоставляют права. Служебная шина распознает три основных вида прав: право "Отправление" позволяет отправлять, право "Прослушивание" — принимать, а право "Управление" — управлять объектами. Маркеры SAS служебной шины ссылаются на правила, настроенные в пространстве имен или объекте, и эти правила настраиваются с помощью этих прав. Таким образом, если подписать маркер с помощью ключа, связанного с этим правилом, он предоставит соответствующие права. Маркер, связанный с объектом с помощью put-token, позволит подключенному клиенту взаимодействовать с объектом, используя права маркера. Для связи, в которой клиент принимает роль отправителя, требуются права на отправку, а для роли получателя — права на прослушивание.
Для сообщения ответа предусмотрены следующие значения свойств приложения.
Ключ | Необязательно | Тип значения | Содержимое значения |
---|---|---|---|
status-code |
No | INT | Код ответа HTTP [RFC2616]. |
status-description |
Да | строка | Описание состояния. |
Клиент может вызвать запрос put-token несколько раз для любого объекта в инфраструктуре обмена сообщениями. Маркеры ограничиваются текущим клиентом и привязываются к текущему подключению. Это означает, что сервер удаляет хранимые маркеры после разрыва подключения.
Текущая реализация служебной шины разрешает CBS только в сочетании с методом SASL "ANONYMOUS". К моменту подтверждения SASL всегда должно быть установлено соединение SSL/TLS.
Таким образом, выбранный клиент AMQP 1.0 должен поддерживать механизм ANONYMOUS. Анонимный доступ означает, что при подтверждении первого подключения и создании первого сеанса в служебную шину не поступают сведения о том, кто устанавливает подключение.
После установки подключения и создания сеанса единственными допустимыми операциями являются подключение связей к узлу $cbs и отправка запроса put-token. Для определенного узла объекта в течение 20 секунд после установки подключения необходимо задать допустимый маркер с помощью запроса put-token. В противном случае служебная шина разорвет подключение в одностороннем порядке.
После этого клиент отвечает за отслеживание срока действия маркеров. По истечении срока действия маркера служебная шина незамедлительно разрывает все связи с соответствующим объектом. Чтобы избежать возникновения этой проблемы, клиент может в любое время заменить маркер для узла новым маркером через виртуальный узел управления $cbs с помощью того же запроса put-token, при этом не препятствуя трафику полезных данных, который поступает в разных связях.
Функция отправки через посредника
Отправка через посредника / передача отправителя — это функция, позволяющая служебной шине перенаправлять заданное сообщение к сущности назначения через другую сущность. Эта функция используется для выполнения операций между сущностями в единой транзакции.
С помощью данной функции вы создаете отправителя и устанавливаете канал связи с via-entity
. При установлении канала передаются дополнительные сведения для установления истинного назначения сообщений/передач по этому каналу. После успешного присоединения все сообщения, отправленные по этому каналу, будут автоматически перенаправляться к сущности назначения через промежуточную сущность.
Примечание. Перед установлением этого канала необходимо выполнить проверку подлинности как для промежуточной сущности, так и для сущности назначения.
Клиент | Направление | Cлужебная шина |
---|---|---|
attach(<br/>name={link name},<br/>role=sender,<br/>source={client link ID},<br/>target=**{via-entity}**,<br/>**properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )]** ) |
------> | |
<------ | attach(<br/>name={link name},<br/>role=receiver,<br/>source={client link ID},<br/>target={via-entity},<br/>properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )] ) |
Связанный контент
Дополнительные сведения об AMQP см. в статье Общие сведения о служебной шине AMQP.