Настройка приема потоковой передачи в кластере Azure Data Explorer
Статья
Прием потоковой передачи используется для загрузки данных, если требуется низкая задержка между приемом и запросом. Вы можете использовать прием потоковой передачи в следующих сценариях:
Требуется задержка меньше секунды.
Для оптимизации операционной обработки многих таблиц, когда поток данных в каждой таблице относительно мал (несколько записей в секунду), а общий объем приема данных большой (тысячи записей в секунду).
Если поток данных в каждую таблицу высок (более 4 ГБ в час), рассмотрите возможность приема в очереди.
Примеры кода на основе предыдущих версий пакета SDK см. в архивной статье.
Выбор соответствующего типа приема потоковой передачи
Поддерживаются два типа приема потоковой передачи:
Тип приема
Description
Подключение к данным
Центры событий, Центр Интернета вещей и подключения к данным сетки событий могут использовать прием потоковой передачи, если он включен на уровне кластера. Решение об использовании приема потоковой передачи выполняется в соответствии с политикой приема потоковой передачи, настроенной в целевой таблице. Сведения об управлении подключениями к данным см. в разделе "Концентратор событий", "Центр Интернета вещей" и "Сетка событий".
Настраиваемый прием
Настраиваемое прием требует написания приложения, использующего одну из клиентских библиотек Azure Data Explorer. Используйте сведения в этом разделе для настройки пользовательского приема. Вы также можете найти пример приложения для приема потоковой передачи C?view=azure-data-explorer&preserve-view=true#.
Используйте следующую таблицу, чтобы выбрать тип приема, подходящий для вашей среды.
Критерий
Подключение к данным
Настраиваемая загрузка
Задержка данных между инициированием приема и данными, доступными для запроса
Более длительная задержка
Более короткая задержка
Затраты на разработку
Быстрая и простая установка, без дополнительных затрат на разработку
Высокая нагрузка при разработке приложения, принимающего данные, обработки ошибок и обеспечения согласованности данных.
Примечание.
Вы можете управлять процессом включения и отключения приема потоковой передачи в кластере с помощью портала Azure или программных средств C#. Если вы используете C# для своего пользовательского приложения, возможно, вам будет удобнее использовать программные средства.
Рекомендации по обеспечению производительности и эксплуатации
Основные факторы, которые могут повлиять на прием потоковой передачи:
Размер виртуальной машины и кластера. Производительность и емкость приема потоковой передачи повышаются с увеличением размера виртуальной машины и кластера. Число параллельных запросов приема ограничено шестью на ядро. Например, для номеров SKU с 16 ядрами, таких как D14 и L16, максимальная поддерживаемая загрузка составляет 96 параллельных запросов приема. Для номеров SKU с двумя ядрами, например D11, максимальная поддерживаемая нагрузка составляет 12 параллельных запросов приема.
Ограничение на объем данных. Предельный объем данных для запроса на прием потоковой передачи составляет 4 МБ. Это касается всех данных, созданных для политик обновления во время приема.
Обновления схемы. Обновления схемы, такие как создание и изменение таблиц и сопоставлений приема, для службы приема потоковой передачи могут выполняться до 5 минут. Дополнительные сведения см. в статье Прием потоковой передачи данных и изменения схемы.
Емкость SSD. При включении приема потоковой передачи в кластере, даже если данные не принимаются через потоковую передачу, для данных приема потоковой передачи будет использоваться часть локального диска SSD компьютеров кластера. Это уменьшает объем хранилища, доступный для горячего кэша.
Включите прием потоковой передачи в своем кластере
Чтобы включить прием потоковой передачи в кластере Azure Data Explorer, выполните следующий код:
using System.Threading.Tasks;
using Azure;
using Azure.Core;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var clusters = resourceGroup.GetKustoClusters();
var location = new AzureLocation("<location>");
var skuName = new KustoSkuName("<skuName>");
var skuTier = new KustoSkuTier("<skuTier>");
var clusterData = new KustoClusterData(location, new KustoSku(skuName, skuTier)) { IsStreamingIngestEnabled = true };
await clusters.CreateOrUpdateAsync(WaitUntil.Completed, clusterName, clusterData);
}
}
Включение приема потоковой передачи в существующем кластере
Если у вас есть существующий кластер, вы можете включить прием потоковой передачи с помощью портала Azure или программных средств C#.
На портале Azure перейдите к кластеру Azure Data Explorer.
В разделе Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Вкл., чтобы включить Прием потоковой передачи.
Выберите Сохранить.
Вы можете включить прием потоковой передачи при обновлении существующего кластера Azure Data Explorer.
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = true };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Создание целевой таблицы и определение политики
Создайте таблицу для получения данных приема потоковой передачи и определите связанную с ней политику с помощью портала Azure или программных средств C#.
Чтобы создать таблицу, которая будет получать данные посредством потоковой передачи, скопируйте следующую команду в Панель запросов и выберите Выполнить.
Скопируйте одну из следующих команд в Панель запросов и выберите Выполнить. Будет определена политика приема потоковой передачи для созданной вами таблицы или базы данных, содержащей эту таблицу.
Совет
Политика, которая определяется на уровне базы данных, применяется ко всем существующим и будущим таблицам в базе данных. Если вы включите политику на уровне базы данных, ее не нужно включить для каждой таблицы.
Чтобы определить политику для созданной таблицы, выполните следующий код:
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.gzip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.<region>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Отключение приема потоковой передачи в кластере
Предупреждение
Отключение приема потоковой передачи может занять несколько часов.
Перед отключением приема потоковой передачи в кластере Azure Data Explorer удалите политику приема потоковой передачи из всех соответствующих таблиц и баз данных. Удаление политики приема потоковой передачи активирует реорганизацию данных в кластере Azure Data Explorer. Данные приема потоковой передачи перемещаются из первоначального хранилища в постоянное хранилище в хранилище столбцов (экстентов или сегментов). Этот процесс может занять от нескольких секунд до нескольких часов в зависимости от объема данных в первоначальном хранилище.
Удаление политики приема потоковой передачи
Политику приема потоковой передачи можно удалить с помощью портала Azure или программных средств C#.
На портале Azure перейдите в кластер Azure Data Explorer и выберите Запрос.
Чтобы удалить политику приема потоковой передачи из таблицы, скопируйте следующую команду в Панель запросов и выберите Выполнить.
.delete table TestTable policy streamingingestion
В разделе Параметры выберите пункт Конфигурации.
В области Конфигурации выберите Выкл., чтобы включить Прием потоковой передачи.
Выберите Сохранить.
Чтобы удалить политику приема потоковой передачи из таблицы, выполните следующий код:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Чтобы отключить прием потоковой передачи в кластере, выполните следующий код:
using System.Threading.Tasks;
using Azure;
using Azure.Identity; // Required package Azure.Identity
using Azure.ResourceManager;
using Azure.ResourceManager.Kusto; // Required package Azure.ResourceManager.Kusto
using Azure.ResourceManager.Kusto.Models;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
var subscriptionId = "<subscriptionId>";
var credentials = new ClientSecretCredential(appTenant, appId, appKey);
var resourceManagementClient = new ArmClient(credentials, subscriptionId);
var resourceGroupName = "<resourceGroupName>";
var clusterName = "<clusterName>";
var subscription = await resourceManagementClient.GetDefaultSubscriptionAsync();
var resourceGroup = (await subscription.GetResourceGroupAsync(resourceGroupName)).Value;
var cluster = (await resourceGroup.GetKustoClusterAsync(clusterName)).Value;
var clusterPatch = new KustoClusterPatch(cluster.Data.Location) { IsStreamingIngestEnabled = false };
await cluster.UpdateAsync(WaitUntil.Completed, clusterPatch);
}
}
Ограничения
Сопоставления данных должны быть предварительно созданы, чтобы их можно было использовать для приема потоковой передачи. Отдельные запросы на прием потоковой передачи не поддерживают встроенные сопоставления данных.
Невозможно установить теги экстентов для данных приема потоковой передачи.
Политика обновления. Политика обновления может ссылаться только на недавно принятые данные в исходной таблице, а не на другие данные или таблицы в базе данных.
Если политика обновления с политикой транзакций завершается ошибкой, повторные попытки будут возвращаться к пакетной приему.
Если для ведущего кластера применяется потоковый прием данных, кластер подписчиков также должен использовать потоковый прием данных, чтобы отслеживать данных потокового приема. Это же условие применимо при общем доступе к данным кластера через Data Share.