Tutoriel : Exécuter un pipeline analytique de lakehouse de bout en bout
Ce tutoriel vous montre comment configurer un pipeline d’analytique de bout en bout pour un lakehouse Azure Databricks.
Important
Ce tutoriel utilise des notebooks interactifs pour effectuer des tâches ETL courantes en Python sur des clusters Unity Catalog. Si vous n’utilisez pas le Catalogue Unity, consultez Exécuter votre première charge de travail ETL sur Azure Databricks.
Tâches dans ce tutoriel
À la fin de cet article, vous serez en mesure d’effectuer les opérations suivantes :
- Lancer un cluster de calcul Unity Catalog
- Créer un notebook Databricks
- Écrire et lire des données à partir d’un emplacement externe Unity Catalog
- Configurer l’ingestion incrémentielle de données dans une table Catalogue Unity avec Auto Loader
- Exécuter des cellules de notebook pour traiter, interroger et prévisualiser des données
- Planifier un notebook en tant que travail Databricks
- Interroger des tables Unity Catalog à partir de Databricks SQL
Azure Databricks fournit une suite d’outils prêts pour la production qui permettent aux professionnels des données de développer et de déployer rapidement des pipelines d’extraction, de transformation et de chargement (ETL). Unity Catalog permet aux gestionnaires de données de configurer et de sécuriser les informations d’identification de stockage, les emplacements externes et les objets de base de données des utilisateurs de toute l’organisation. Databricks SQL permet aux analystes d’exécuter des requêtes SQL sur les mêmes tables que celles qui sont utilisées dans les charges de travail ETL de production, ce qui facilite le décisionnel en temps réel à grande échelle.
Vous pouvez également utiliser Delta Live Tables pour générer des pipelines ETL. Databricks a créé Delta Live Tables pour réduire la complexité de la création, du déploiement et de la maintenance des pipelines ETL de production. Consulter Tutoriel : Exécuter votre premier pipeline Delta Live Tables.
Spécifications
Notes
Si vous ne disposez pas des privilèges de contrôle de cluster, vous pouvez quand même effectuer la plupart des étapes ci-dessous tant que vous avez accès à un cluster.
Étape 1 : Créer un cluster
Pour effectuer des analyses exploratoires des données et de l’engineering données, créez un cluster permettant de fournir les ressources de calcul nécessaires à l’exécution des commandes.
- Cliquez sur Calcul dans la barre latérale.
- Cliquez sur Nouveau dans la barre latérale, puis sélectionnez Cluster. La page Nouveau cluster/Calcul s’ouvre.
- Attribuez un nom unique au cluster.
- Sélectionnez la case d’option Nœud unique.
- Sélectionnez Utilisateur unique dans la liste déroulante Mode d'accès.
- Assurez-vous que votre adresse e-mail est visible dans le champ Utilisateur unique.
- Sélectionnez la Version du runtime Databricks souhaitée, 11.1 ou ultérieure, pour utiliser Unity Catalog.
- Cliquez sur Créer un calcul pour créer le cluster.
Pour en savoir plus sur les clusters Databricks, consultez Calculer.
Étape 2 : Créer un notebook Databricks
Pour créer un notebook dans votre espace de travail, cliquez sur Nouveau dans la barre latérale, puis sur Notebook. Un notebook vide s’ouvre dans l’espace de travail.
Pour en savoir plus sur la création et la gestion des notebooks, consultez Gérer les notebooks.
Étape 3 : Écrire et lire des données à partir d’un emplacement externe géré par Unity Catalog
Databricks recommande d’utiliser Auto Loader pour l’ingestion incrémentielle des données. Auto Loader détecte et traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent dans le stockage d’objets cloud.
Utilisez Unity Catalog pour gérer l’accès sécurisé aux emplacements externes. Les utilisateurs ou les principaux de service disposant d’autorisations READ FILES
sur un emplacement externe peuvent utiliser Auto Loader pour ingérer des données.
Normalement, les données arrivent dans un emplacement externe en raison d’écritures provenant d’autres systèmes. Dans cette démo, vous pouvez simuler l’arrivée de données en écrivant des fichiers JSON dans un emplacement externe.
Copiez le code ci-dessous dans une cellule de notebook. Remplacez la valeur de chaîne de catalog
par le nom d’un catalogue avec les autorisations CREATE CATALOG
et USE CATALOG
. Remplacez la valeur de chaîne de external_location
par le chemin d’un emplacement externe avec les autorisations READ FILES
, WRITE FILES
et CREATE EXTERNAL TABLE
.
Les emplacements externes peuvent être définis comme un conteneur de stockage entier, mais ils pointent souvent vers un répertoire imbriqué dans un conteneur.
Le format correct d’un chemin d’emplacement externe est "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location"
.
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
L’exécution de cette cellule doit imprimer une ligne qui lit 12 octets, imprimer la chaîne « Hello world! » et afficher toutes les bases de données présentes dans le catalogue fourni. Si vous ne parvenez pas à exécuter cette cellule, vérifiez que vous vous trouvez dans un espace de travail Unity Catalog et demandez les autorisations appropriées à l’administrateur de l’espace de travail pour suivre ce tutoriel.
Le code Python ci-dessous utilise votre adresse e-mail pour créer une base de données unique dans le catalogue fourni et un emplacement de stockage unique dans un emplacement externe fourni. L’exécution de cette cellule supprime toutes les données associées à ce tutoriel, ce qui vous permet d’exécuter cet exemple de façon idempotente. Une classe est définie et instanciée que vous utiliserez pour simuler des lots de données arrivant d'un système connecté vers votre emplacement externe source.
Copiez ce code dans une nouvelle cellule de votre notebook et exécutez-le pour configurer votre environnement.
Notes
Les variables définies dans ce code doivent vous permettre de l’exécuter en toute sécurité sans risque de conflit avec les ressources existantes de l’espace de travail ou avec d’autres utilisateurs. Les autorisations de réseau ou de stockage restreintes génèrent des erreurs lors de l’exécution de ce code ; contactez l’administrateur de votre espace de travail pour remédier à ces restrictions.
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
Vous pouvez désormais obtenir un lot de données en copiant le code suivant dans une cellule et en l’exécutant. Vous pouvez exécuter manuellement cette cellule jusqu’à 60 fois pour déclencher l’arrivée de nouvelles données.
RawData.land_batch()
Étape 4 : Configurer Auto Loader pour ingérer des données dans Unity Catalog
Databricks recommande de stocker les données avec Delta Lake. Delta Lake est une couche de stockage open source qui fournit des transactions ACID et active le data lakehouse. Delta Lake est le format par défaut des tables créées dans Databricks.
Pour configurer Auto Loader afin d’ingérer des données dans une table Unity Catalog, copiez et collez le code suivant dans une cellule vide de votre notebook :
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Pour en savoir plus sur le Chargeur automatique, consultez Qu’est-ce que le chargeur automatique ?
Pour en savoir plus sur Structured Streaming avec Unity Catalog, consultez Utilisation de Unity Catalog avec Structured Streaming.
Étape 5 : Traiter les données et interagir avec elles
Les notebooks exécutent la logique cellule par cellule. Procédez comme suit pour exécuter la logique dans votre cellule :
Pour exécuter la cellule de l’étape précédente, sélectionnez-la, puis appuyez sur MAJ+ENTRÉE.
Pour interroger la table que vous venez de créer, copiez et collez le code suivant dans une cellule vide, puis appuyez sur MAJ+ENTRÉE pour exécuter la cellule.
df = spark.read.table(table)
Pour prévisualiser les données dans votre DataFrame, copiez et collez le code suivant dans une cellule vide, puis appuyez sur MAJ+ENTRÉE pour exécuter la cellule.
display(df)
Pour en savoir plus sur les options interactives de visualisation des données, consultez Visualisations dans les notebooks Databricks.
Étape 6 : Planifier un travail
Vous pouvez exécuter des notebooks Databricks en tant que scripts de production en les ajoutant comme tâche dans un travail Databricks. Au cours de cette étape, vous allez créer un travail que vous pourrez déclencher manuellement.
Pour planifier votre notebook en tant que tâche :
- Cliquez sur Planifier sur le côté droit de la barre d’en-tête.
- Entrez un nom unique dans le champ Nom du travail.
- Cliquez sur Manuel.
- Dans la liste déroulante Cluster, sélectionnez le cluster que vous avez créé à l’étape 1.
- Cliquez sur Créer.
- Dans la fenêtre qui s’affiche, cliquez sur Exécuter maintenant.
- Pour afficher les résultats de l’exécution du travail, cliquez sur l’icône en regard de l’horodatage Dernière exécution.
Pour plus d’informations sur les projets, consultez Qu’est-ce qu’un projet Databricks ?
Étape 7 : Interroger la table à partir de Databricks SQL
Toute personne disposant des autorisations USE CATALOG
sur le catalogue et les autorisations USE SCHEMA
sur le schéma actuel ainsi que des autorisations SELECT
sur le tableau peut interroger le contenu du tableau à partir de son API Databricks préférée.
Vous devez avoir accès à un entrepôt SQL en cours d’exécution pour exécuter des requêtes dans Databricks SQL.
La table que vous avez créée plus tôt dans ce tutoriel porte le nom target_table
. Vous pouvez l’interroger à l’aide du catalogue que vous avez fourni dans la première cellule et de la base de données avec le schéma e2e_lakehouse_<your-username>
. Vous pouvez utiliser l’explorateur de catalogues pour rechercher les objets de données que vous avez créés.
Intégrations supplémentaires
Découvrez-en plus sur les intégrations et les outils d’engineering données avec Azure Databricks :