Konfigurieren der Streamingerfassung in Ihrem Azure Data Explorer-Cluster
Artikel
Die Streamingerfassung ist nützlich zum Laden von Daten, wenn eine geringe Wartezeit zwischen Erfassung und Abfrage erforderlich ist. Die Streamingerfassung kann in folgenden Szenarien verwendet werden:
Eine Wartezeit von weniger als einer Sekunde ist erforderlich.
Optimierung der operativen Verarbeitung vieler Tabellen, bei denen der Datenstrom für die einzelnen Tabellen jeweils relativ klein (wenige Datensätze pro Sekunde), das Gesamtvolumen der Datenerfassung aber hoch ist (mehrere tausend Datensätze pro Sekunde).
Wenn der Datenstrom für die einzelnen Tabellen groß ist (über 4 GB pro Stunde), sollten Sie ggf. die Batcherfassung nutzen.
Event Hub-, IoT Hub- und Event Grid-Datenverbindungen können die Streamingerfassung verwenden, sofern sie auf Clusterebene aktiviert ist. Die Entscheidung für die Verwendung der Streamingerfassung erfolgt gemäß der in der Zieltabelle konfigurierten Streamingerfassungsrichtlinie. Informationen zum Verwalten von Datenverbindungen finden Sie unter Event Hub, IoT Hub und Event Grid.
Benutzerdefinierte Erfassung
Für die benutzerdefinierte Erfassung muss eine Anwendung geschrieben werden, die eine der Clientbibliotheken von Azure Data Explorer verwendet. Konfigurieren Sie die benutzerdefinierte Erfassung anhand der Informationen in diesem Thema. Unter Umständen finden Sie auch die C#-Beispielanwendung für die Streamingerfassung hilfreich.
Die folgende Tabelle unterstützt Sie beim Auswählen eines für Ihre Umgebung geeigneten Erfassungstyps:
Kriterium
Datenverbindung
Benutzerdefinierte Erfassung
Datenverzögerung zwischen der Initiierung der Erfassung und der Verfügbarkeit der Daten für Abfragen
Längere Verzögerung
Kürzere Verzögerung
Zusätzlicher Entwicklungsaufwand
Schnelle und einfache Einrichtung, kein zusätzlicher Entwicklungsaufwand
Hoher zusätzlicher Entwicklungsaufwand zum Erstellen einer Anwendung, um die Daten zu erfassen, Fehler zu behandeln und die Datenkonsistenz sicherzustellen.
Hinweis
Sie können den Prozess zum Aktivieren und Deaktivieren der Streamingerfassung in Ihrem Cluster mithilfe des Azure-Portals oder programmgesteuert in C# verwalten. Wenn Sie C# für Ihre benutzerdefinierte Anwendung nutzen, ist der programmgesteuerte Ansatz möglicherweise praktischer.
Die wichtigsten Aspekte, die sich auf die Streamingerfassung auswirken können:
VM- und Clustergröße: Leistung und Kapazität der Streamingerfassung werden für größere virtuelle Computer und Cluster skaliert. Die Anzahl der gleichzeitigen Erfassungsanforderungen ist auf sechs pro Kern beschränkt. Beispielsweise besteht die maximale unterstützte Last bei SKUs mit 16 Kernen (z. B. D14 und L16) aus 96 gleichzeitigen Erfassungsanforderungen. Bei SKUs mit 2 Kernen (z. B. D11) werden als maximale Last 12 gleichzeitige Erfassungsanfragen unterstützt.
Datengrößenbeschränkung: Die Datengröße für Anforderungen zur Streamingerfassung ist auf 4 MB beschränkt. Dies schließt alle Daten ein, die während der Erfassung für Updaterichtlinien erstellt wurden.
Schemaaktualisierungen: Schemaaktualisierungen wie die Erstellung und Änderung von Tabellen und Erfassungszuordnungen können für den Streamingerfassungsdienst bis zu fünf Minuten dauern. Weitere Informationen finden Sie unter Streamingerfassung und Schemaänderungen.
SSD-Kapazität: Wenn die Streamingerfassung in einem Cluster aktiviert wird, wird ein Teil des lokalen SSD-Datenträgers der Clustercomputer für Streamingerfassungsdaten genutzt, wodurch sich der verfügbare Speicherplatz für den aktiven Cache verringert. Dies gilt auch, wenn gar keine Daten per Streaming erfasst werden.
Aktivieren der Streamingerfassung in Ihrem Cluster
Navigieren Sie im Azure-Portal zum Azure Data Explorer-Cluster.
Wählen Sie unter Einstellungen die Option Konfigurationen aus.
Wählen Sie im Bereich Konfigurationen die Option Ein aus, um die Streamingerfassung zu aktivieren.
Wählen Sie Speichern aus.
Sie können die Streamingerfassung aktivieren, während Sie einen neuen Azure Data Explorer-Cluster erstellen.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Erstellen einer Zieltabelle und Definieren der Richtlinie
Erstellen Sie eine Tabelle, um die Streamingerfassungsdaten zu empfangen, und definieren Sie die zugehörige Richtlinie mithilfe des Azure-Portals oder programmgesteuert in C#.
Kopieren Sie den folgenden Befehl in den Abfragebereich, und wählen Sie Ausführen aus, um die Tabelle zu erstellen, die die Daten über Streamingerfassung erhalten soll:
Kopieren Sie einen der folgenden Befehle in den Abfragebereich, und wählen Sie Ausführen aus: Dadurch wird die Streamingerfassungsrichtlinie für die von Ihnen erstellte Tabelle oder für die Datenbank definiert, die diese Tabelle enthält.
Tipp
Eine Richtlinie, die auf Datenbankebene definiert ist, gilt für alle vorhandenen und zukünftigen Tabellen in der Datenbank. Wenn Sie die Richtlinie auf Datenbankebene aktivieren, ist es nicht erforderlich, sie pro Tabelle zu aktivieren.
Verwenden Sie Folgendes, um die Richtlinie für die von Ihnen erstellte Tabelle zu definieren:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "<tableName>";
var tableSchema = new TableSchema(
tableName,
new ColumnSchema[]
{
new("TimeStamp", "System.DateTime"),
new("Name", "System.String"),
new("Metric", "System.int"),
new("Source", "System.String"),
});
var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
await client.ExecuteControlCommandAsync(tableCreateCommand);
await client.ExecuteControlCommandAsync(tablePolicyAlterCommand);
}
}
Erstellen einer Anwendung für die Streamingerfassung zum Erfassen von Daten in Ihrem Cluster
Erstellen Sie Ihre Anwendung zum Erfassen von Daten in Ihrem Cluster unter Verwendung der von Ihnen bevorzugten Sprache.
using System.IO;
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Ingest; // Requires Package Microsoft.Azure.Kusto.Ingest
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateStreamingIngestClient(connectionStringBuilder);
// Ingest from a compressed file
var fileStream = File.Open("MyFile.gz", FileMode.Open);
// Initialize client properties
var ingestionProperties = new KustoIngestionProperties(databaseName: "<databaseName>", tableName: "<tableName>");
// Create source options
var sourceOptions = new StreamSourceOptions { CompressionType = DataSourceCompressionType.GZip, };
// Ingest from stream
await client.IngestFromStreamAsync(fileStream, ingestionProperties, sourceOptions);
}
}
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go//azure/data-explorer/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Deaktivieren der Streamingerfassung in Ihrem Cluster
Warnung
Die Deaktivierung der Streamingerfassung kann mehrere Stunden dauern.
Bevor Sie die Streamingangabe für Ihren Azure Data Explorer-Cluster deaktivieren, löschen Sie die Streamingerfassungsrichtlinie aus allen relevanten Tabellen und Datenbanken. Durch das Entfernen der Streamingerfassungsrichtlinie wird eine Neuanordnung der Daten in Ihrem Azure Data Explorer-Cluster ausgelöst. Die Streamingerfassungsdaten werden aus dem anfänglichen Speicher in den permanenten Speicher im Spaltenspeicher (Erweiterungen oder Shards) verschoben. Dieser Vorgang kann abhängig von der Datenmenge im anfänglichen Speicher zwischen einigen Sekunden und wenigen Stunden dauern.
Löschen der Streamingerfassungsrichtlinie
Sie können die Streamingerfassungsrichtlinie mithilfe des Azure-Portals oder programmgesteuert in C# löschen.
Navigieren Sie im Azure-Portal zum Azure Data Explorer-Cluster, und wählen Sie Abfrage aus.
Kopieren Sie zum Löschen der Streamingerfassungsrichtlinie aus der Tabelle den folgenden Befehl in den Abfragebereich, und wählen Sie Ausführen aus:
.delete table TestTable policy streamingingestion
Wählen Sie unter Einstellungen die Option Konfigurationen aus.
Wählen Sie im Bereich Konfigurationen die Option Off aus, um die Streamingerfassung zu aktivieren.
Wählen Sie Speichern aus.
Führen Sie den folgenden Code aus, um die Streamingerfassungsrichtlinie aus der Tabelle zu entfernen:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion;
class Program
{
static async Task Main(string[] args)
{
var clusterPath = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var connectionStringBuilder = new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
using var client = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand("<dbName>", "<tableName>");
await client.ExecuteControlCommandAsync(tablePolicyDropCommand);
}
}
Führen Sie den folgenden Code aus, um die Streamingerfassungsrichtlinie für Ihren Cluster zu deaktivieren:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Einschränkungen
Datenzuordnungen müssen vorab erstellt werden, damit sie bei der Streamingerfassung verwendet werden können. Individuelle Anforderungen zur Streamingerfassung bieten keine Inlinedatenzuordnungen.
Erweiterungstags können für die Streamingerfassungsdaten nicht festgelegt werden.
Aktualisierungsrichtlinie: Die Aktualisierungsrichtlinie kann nur auf die neu erfassten Daten in der Quelltabelle und nicht auf andere Daten oder Tabellen in der Datenbank verweisen.
Wenn die Streamingerfassung für einen Cluster aktiviert ist, der als Leader für Follower-Datenbanken verwendet wird, muss die Streamingerfassung auch für die folgenden Clustern aktiviert werden, um Streamingerfassungsdaten zu folgen. Gleiches gilt für die Freigabe der Clusterdaten über Data Share.