Обработка потоков данных с помощью Azure Stream Analytics

Azure Cosmos DB
Центры событий Azure
Azure Monitor
Azure Stream Analytics

На схеме эталонной архитектуры представлен сквозной конвейер обработки потоков данных. Конвейер принимает данные из двух источников, сопоставляет записи в двух потоках и вычисляет скользящее среднее за определенный промежуток времени. Результаты сохраняются для дальнейшего анализа.

Логотип GitHub Эталонная реализация этой архитектуры доступна на сайте GitHub.

Архитектура

Схема, показывающая эталонную архитектуру для создания конвейера потоковой обработки с помощью Azure Stream Analytics.

Скачайте файл Visio для этой архитектуры.

Рабочий процесс

Она состоит из следующих компонентов:

Источники данных. В этой архитектуре существует два источника данных, которые создают потоки данных в реальном времени. Первый поток содержит сведения о поездке, а второй — о тарифах. В эталонной архитектуре есть имитированный генератор данных, который считывает данные из набора статических файлов и отправляет данные в Центры событий. В реальном приложении источниками данных будут устройства, установленные в такси.

Центры событий Azure. Центры событий — это служба приема событий. В этой архитектуре используется два экземпляра службы — по одной на каждый источник данных. Каждый источник данных отправляет поток данных в соответствующую службу.

Azure Stream Analytics. Stream Analytics — это модуль обработки событий. Задание Stream Analytics считывает потоки данных из двух экземпляров и обрабатывает эти потоки.

Azure Cosmos DB. Выходные данные задания Stream Analytics — это серия записей, которые записываются в виде документов JSON в базу данных документов Azure Cosmos DB.

Microsoft Power BI. Power BI — набор средств бизнес-аналитики для анализа информации о бизнесе. В этой архитектуре данные загружаются из Azure Cosmos DB. Благодаря этому пользователи могут выполнять анализ полного набора собранных исторических данных. Также результаты можно передать в виде потока прямо из Stream Analytics в Power BI, чтобы просмотреть данные в реальном времени. Дополнительные сведения см. в статье Потоковая передача данных в реальном времени в Power BI.

Azure Monitor. Azure Monitor собирает метрики производительности о службах Azure, развернутых в решении. Отобразив эти данные в визуализации на панели мониторинга, можно получить полезные сведения о работоспособности решения.

Подробности сценария

Сценарий. Компания, предоставляющая услуги такси, собирает данные о каждой поездке в такси. В этом сценарии предполагается, что данные отправляются с двух отдельных устройств. Такси имеет метр, который отправляет информацию о каждой поездке — длительность, расстояние и посадку и раскрывающиеся места. Отдельное устройство принимает платежи от клиентов и отправляет данные о тарифах. Компании необходимо вычислить среднюю сумму чаевых на милю в реальном времени, чтобы определить тенденции.

Потенциальные варианты использования

Это решение оптимизировано для розничного сценария.

Прием данных

Для имитации источника данных в этой эталонной архитектуре используется набор данных[1]New York City Taxi Data (Данные о поездках в такси в Нью-Йорке). Этот набор данных содержит данные о поездках по такси в Нью-Йорке в течение четырехлетнего периода (2010–2013). Он содержит два типа записей: данные о поездках и данные о тарифах. Данные о поездках включают сведения о продолжительности поездки, расстоянии, а также местах посадки и высадки. Данные о тарифах включают сведения о тарифе, налоге и сумме чаевых. В обоих типах записей есть стандартные поля: номер медальона, лицензия на право вождения и код организации. Вместе эти три поля позволяют уникально идентифицировать такси и водителя. Данные хранятся в формате CSV.

[1] Донован, Брайан; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Иллинойсский университет в Урбане-Шампейне. https://doi.org/10.13012/J8PN93H8

Генератор данных — это приложение .NET Core, которое считывает записи и отправляет их в Центры событий Azure. Генератор отправляет данные о поездке в формате JSON, а данные о тарифах — в формате CSV.

Для сегментации данных Центры событий используют секции. Они позволяют объекту-получателю считывать данные каждой секции параллельно. При отправке данных в Центры событий можно явно указать ключ секции. В противном случае записи назначаются секциям методом циклического перебора.

В нашем примере данные о поездках и тарифах должны в итоге иметь одинаковый идентификатор секции для определенного такси. Это позволит Stream Analytics применить определенную степень параллелизма при сопоставлении двух потоков. Запись в секции n с данными о поездке будет соответствовать записи в секции n с данными о тарифах.

Схема потоковой обработки с помощью Azure Stream Analytics и Центров событий Azure

В генераторе данных общая модель данных для обоих типов записей имеет свойство PartitionKey, в котором объединены Medallion, HackLicense и VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Это свойство используется для явного предоставления ключа секции при отправке в Центры событий:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Потоковая обработка

Задание обработки потока определяется с помощью SQL-запроса в несколько отдельных шагов. На первых двух шагах просто выбираются записи из двух входных потоков.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

На следующем шаге эти два входных потока объединяются для выбора совпадающих записей из каждого потока.

Step3 AS (
  SELECT tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.PartitionId = tf.PartitionId
     AND tr.PickupTime = tf.PickupTime
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Этот запрос присоединяет записи к набору полей, которые однозначно определяют соответствующие записи (PartitionId и PickupTime).

Примечание.

Мы хотимTaxiRide, чтобы потоки TaxiFare были присоединены к уникальному сочетанию Medallion, HackLicenseVendorId и PickupTime. В этом случае PartitionId рассматриваются MedallionHackLicense поля и VendorId поля, но это не должно быть принято как правило.

В Stream Analytics объединения являются темпоральными, то есть записи объединяются в пределах определенного временного окна. В противном случае задание может бесконечно ожидать сопоставления. Функция DATEDIFF указывает, насколько две совпадающих записи могут быть разделены по времени для сопоставления.

На последнем шаге задания вычисляется средняя сумма чаевых на милю, сгруппированных по "прыгающему" окну в 5 минут.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics предоставляет несколько функций обработки методом окна. "Прыгающее" окно перемещается вперед во времени в рамках фиксированного периода, в нашем примере — на 1 минуту за прыжок. Результатом будет вычисление скользящего среднего за последние 5 минут.

В приведенной здесь архитектуре сохраняются только результаты задания Stream Analytics в Azure Cosmos DB. Для сценария с большими данными используйте также функцию Сбор в Центрах событий, чтобы сохранять необработанные данные о событиях в хранилище BLOB-объектов Azure. Сохранение необработанных данных позволит вам в дальнейшем выполнять пакетные запросы к данным журнала, чтобы получить новые полезные сведения из этих данных.

Рекомендации

Эти рекомендации реализуют основные принципы платформы Azure Well-Architected Framework, которая является набором руководящих принципов, которые можно использовать для улучшения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.

Масштабируемость

Event Hubs

Пропускная способность Центров событий вычисляется в единицах пропускной способности. Вы можете автоматически масштабировать концентратор событий, включив автоматическое расширение. Это позволит автоматически масштабировать единицы пропускной способности в зависимости от трафика вплоть до заданного максимума.

Stream Analytics

В Stream Analytics вычислительные ресурсы, выделенные для задания, измеряются в единицах потоковой передачи. Задания Stream Analytics лучше всего масштабировать, если оно может выполняться параллельно. Таким образом, Stream Analytics сможет распределять задания между несколькими вычислительными узлами.

Для входных данных Центров событий используйте ключевое слово PARTITION BY, чтобы секционировать задание Stream Analytics. Данные будут разделены на подмножества на основе секций Центров событий.

Для функций обработки методом окна и временных объединений требуются дополнительные единицы потоковой передачи. По возможности используйте PARTITION BY так, чтобы каждая секция обрабатывалась отдельно. Дополнительные сведения см. в статье Обзор и настройка единиц потоковой передачи.

Если невозможно параллелизовать все задание Stream Analytics, попробуйте разбить его на несколько шагов, начиная с одного или нескольких параллельных шагов. Так первые шаги можно будет выполнить параллельно. Например, как в этой эталонной архитектуре:

  • Шаги 1 и 2 — это простые инструкции SELECT, которые выбирают записи из одной секции.
  • На шаге 3 выполняется секционированное объединение двух входных потоков. На этом шаге используется преимущество, заключающееся в том, что совпадающие записи имеют один и тот же ключ секции. Поэтому у них всегда будет одинаковый идентификатор секции в каждом входном потоке.
  • На шаге 4 выполняется статистическое вычисление для всех секций. Этот шаг нельзя выполнить параллельно.

Используйте схему заданий в Stream Analytics, чтобы просмотреть, сколько секций назначено каждому шагу в задании. Ниже показана схема заданий для этой эталонной архитектуры:

Схема заданий Stream Analytics.

Azure Cosmos DB

Емкость пропускной способности для Azure Cosmos DB измеряется в единицах запросов (ЕЗ). Чтобы масштабировать контейнер Azure Cosmos DB за последние 10 000 ЕЗ, необходимо указать ключ секции при создании контейнера и включить ключ секции в каждый документ.

В этой эталонной архитектуре новые документы создаются только раз в минуту (интервал "прыгающего" окна), поэтому требования к пропускной способности довольно низкие. По этой причине нет необходимости назначать ключ секции в этом сценарии.

Наблюдение

Для любого решения обработки потоков данных очень важно отслеживать производительность и работоспособность системы. Azure Monitor собирает журналы метрик и диагностики для служб Azure, используемых в этой архитектуре. Служба Azure Monitor встроена в платформу Azure и не требует написания дополнительного кода в приложении.

Любой из следующих сигналов предупреждения указывает на то, что нужно горизонтально увеличить масштаб соответствующего ресурса Azure:

  • Служба "Центры событий" регулирует запросы или указывает на то, что показатели близки к превышению дневной квоты сообщений.
  • Задание Stream Analytics постоянно использует более 80 % выделенных единиц потоковой передачи.
  • Azure Cosmos DB начинает регулировать запросы.

Эталонная архитектура включает пользовательскую панель мониторинга, которая развертывается на портале Azure. После развертывания архитектуры можно просмотреть панель мониторинга, открыв портал Azure и выбрав TaxiRidesDashboard из списка панелей мониторинга. Дополнительные сведения о создании и развертывании пользовательских панелей мониторинга см. в статье Создание панелей мониторинга Azure программными средствами.

На следующем изображении показана панель мониторинга приблизительно через час после запуска задания Stream Analytics.

Снимок экрана панели мониторинга поездок в такси

На внизу слева показано, что использование единиц потоковой передачи для задания Stream Analytics повышается в течение первых 15 минут и затем снижается. Это стандартное развитие событий, так как задание достигает устойчивого состояния.

Обратите внимание на то, что служба "Центры событий" регулирует запросы, как показано вверху справа. Случайно отрегулированный запрос не является проблемой, так как клиентский пакет SDK службы "Центры событий" автоматически осуществляет новую попытку при получении ошибки регулирования. Но если ошибки регулирования повторяются, значит службе требуются дополнительные единицы пропускной способности. На следующей диаграмме показан тестовый запуск с использованием функции автоматического расширения в службе "Центры событий", которая автоматически масштабирует единицы пропускной способности при необходимости.

Снимок экрана: автомасштабирование Центров событий.

Функция автоматического расширения включена приблизительно в 06:35. Вы можете заметить снижение числа пакетов в регулируемых запросах, так как служба "Центры событий" автоматически масштабируется до трех единиц пропускной способности.

Интересно, что это побочный эффект на увеличение использования единиц SU в задании Stream Analytics. При регулировании в службе "Центры событий" искусственно снизилась скорость приема данных для задания Stream Analytics. Довольно часто бывает так, что при устранении одной проблемы с производительностью возникает другая. В таком случае проблему можно решить, выделив дополнительные единицы потоковой передачи для задания Stream Analytics.

Оптимизация затрат

Оптимизация затрат заключается в поиске способов уменьшения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в разделе Обзор критерия "Оптимизация затрат".

Для оценки затрат используйте калькулятор цен Azure. Ниже приведены некоторые рекомендации по службам, используемым в этой эталонной архитектуре.

Azure Stream Analytics

Azure Stream Analytics имеет цену на количество единиц потоковой передачи ($0,11/час), необходимых для обработки данных в службе.

Stream Analytics может быть дорогостоящим, если вы не обрабатываете данные в режиме реального времени или небольших объемах данных. Для этих вариантов использования рекомендуется использовать Функции Azure или Logic Apps для перемещения данных из Центры событий Azure в хранилище данных.

Центры событий Azure и Azure Cosmos DB

Рекомендации по затратам о Центры событий Azure и Azure Cosmos DB см. в статье "Рекомендации по затратам" см. в статье "Обработка потоков с помощью эталонной архитектуры Azure Databricks".

DevOps

  • Создайте отдельные группы ресурсов для рабочей среды, сред разработки и тестирования. Так будет проще управлять развертываниями, удалять тестовые развертывания и назначать права доступа.

  • Используйте шаблон Azure Resource Manager для развертывания ресурсов Azure после инфраструктуры в виде кода (IaC). Благодаря шаблонам автоматизация развертываний с помощью Azure DevOps Services или других решений CI/CD упрощается.

  • Поместите каждую рабочую нагрузку в отдельный шаблон развертывания и сохраните ресурсы в системах управления версиями. Вы можете развернуть шаблоны вместе или отдельно в рамках процесса CI/CD, что упрощает процесс автоматизации.

    В этой архитектуре Центры событий Azure, Log Analytics и Azure Cosmos DB определяются как одна рабочая нагрузка. Эти ресурсы включены в один шаблон ARM.

  • Рассмотрите возможность промежуточного хранения рабочих нагрузок. Развертывание на различных этапах и выполнение проверок на каждом этапе перед переходом к следующему этапу. Таким образом вы можете отправлять обновления в рабочие среды с высокой степенью контроля и свести к минимуму непредвиденные проблемы с развертыванием.

  • Рассмотрите возможность использования Azure Monitor для анализа производительности конвейера потоковой обработки. Дополнительные сведения см. в статье Мониторинг в Azure Databricks.

Дополнительные сведения см. в разделе о принципах эффективности работы в Microsoft Azure Well-Architected Framework.

Развертывание этого сценария

Чтобы выполнить развертывание и запуск эталонной реализации, выполните действия, описанные в файле сведений на GitHub.

Вы можете просмотреть следующий пример сценария Azure, демонстрирующий определенное решение с помощью некоторых из тех же технологий: