Utiliser Azure Event Hubs comme source de données Delta Live Tables
Cet article explique comment utiliser Delta Live Tables pour traiter les messages provenant d'Azure Event Hubs. Vous ne pouvez pas utiliser le connecteur Structured Streaming Event Hubs, car cette bibliothèque n'est pas disponible dans le cadre de Databricks Runtime et Delta Live Tables ne vous permet pas d'utiliser des bibliothèques JVM tierces.
Comment les Delta Live Tables peuvent-elles se connecter à Azure Event Hubs ?
Azure Event Hubs fournit un point de terminaison compatible avec Apache Kafka que vous pouvez utiliser avec le connecteur flux structuré Kafka, disponible dans Databricks Runtime, pour traiter les messages provenant de Azure Event Hubs. Pour plus d’informations sur la compatibilité Azure Event Hubs et Apache Kafka, consultez Utiliser Azure Event Hubs à partir d’applications Apache Kafka.
Les étapes suivantes décrivent la connexion d’un pipeline Delta Live Tables à une instance Event Hubs existante et la consommation d’événements à partir d’une rubrique. Pour effectuer ces étapes, vous avez besoin des valeurs de connexion Event Hubs suivantes :
- Le nom de l’espace de noms Event Hubs.
- Le nom de l’instance Event Hub dans l’espace de noms Event Hubs.
- Un nom de stratégie d’accès partagé et une clé de stratégie pour Event Hubs. Par défaut, une stratégie
RootManageSharedAccessKey
est créée pour chaque espace de noms Event Hubs. Cette stratégie dispose d’autorisationsmanage
,send
etlisten
. Si votre pipeline lit uniquement à partir d’Event Hubs, Databricks recommande de créer une stratégie avec l’autorisation d’écoute uniquement.
Pour plus d’informations sur la chaîne de connexion Event Hubs, consultez Obtenir une chaîne de connexion Event Hubs.
Notes
- Azure Event Hubs fournit des options OAuth 2.0 et de signature d’accès partagé (SAP) pour autoriser l’accès à vos ressources sécurisées. Ces instructions utilisent l’authentification basée sur SAS.
- Si vous obtenez la chaîne de connexion Event Hubs à partir du Portail Azure, il se peut qu’elle ne contienne pas la valeur
EntityPath
. La valeurEntityPath
est nécessaire uniquement lors de l’utilisation du connecteur Structured Streaming Event Hubs. L’utilisation du connecteur Kafka flux structuré nécessite de fournir uniquement le nom de la rubrique.
Stocker la clé de stratégie dans un secret Azure Databricks
Étant donné que la clé de stratégie est des informations sensibles, Databricks recommande de ne pas coder en dur la valeur dans votre code de pipeline. Utilisez plutôt des secrets Azure Databricks pour stocker et gérer l’accès à la clé.
L’exemple suivant utilise l’interface CLI Databricks pour créer une étendue de secret et stocker la clé dans cette étendue de secret. Dans votre code de pipeline, utilisez la fonctiondbutils.secrets.get()
avec scope-name
etshared-policy-name
pour récupérer la valeur de clé.
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Pour plus d’informations sur les secrets Azure Databricks, consultez Gestion des secrets.
Créer un notebook et ajouter le code de pipeline pour consommer des événements
L’exemple suivant lit les événements IoT d’une rubrique, mais vous pouvez adapter l’exemple aux exigences de votre application. À titre de bonne pratique, Databricks recommande d’utiliser les paramètres de pipeline Delta Live Tables pour configurer des variables d’application. Votre code de pipeline utilise ensuite la fonctionspark.conf.get()
pour récupérer des valeurs. Pour plus d’informations sur l’utilisation des paramètres de pipeline pour paramétrer votre pipeline, consultez Utiliser des paramètres avec des pipelines Delta Live Tables.
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Créer le pipeline
Créez un pipeline avec les paramètres suivants, en remplaçant les valeurs d’espace réservé par les valeurs appropriées pour votre environnement.
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
Replace
<container-name>
avec le nom d’un conteneur de compte de stockage Azure.<storage-account-name>
par le nom du compte de stockage ADLS Gen2.<eh-namespace>
par le nom de votre espace de noms Event Hubs.<eh-policy-name>
avec la clé d’étendue secrète pour la clé de stratégie Event Hubs.<eventhub>
par le nom de votre instance Event Hubs.<secret-scope-name>
par le nom de l’étendue de secrets Azure Databricks qui contient la clé de stratégie Event Hubs.
En guise de bonne pratique, ce pipeline n’utilise pas le chemin de stockage DBFS par défaut, mais utilise plutôt un compte de stockage Azure Data Lake Storage Gen2 (ADLS Gen2). Pour plus d'informations sur la configuration de l'authentification pour un compte de stockage ADLS Gen2, consultez Accéder en toute sécurité aux informations d'identification de stockage avec des secrets dans un pipeline.