Partager via


Charger des données en utilisant le streaming tables avec Databricks SQL

Databricks recommande d’utiliser la diffusion en continu tables pour ingérer des données à l’aide de Databricks SQL. Un de diffusion en continu table est un table inscrit à Unity Catalog avec une prise en charge supplémentaire du traitement des données en streaming ou incrémentiel. Un pipeline Delta Live Tables est créé automatiquement pour chaque streaming table. Vous pouvez utiliser le streaming tables pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets dans le cloud.

Cet article illustre l’utilisation de tables de streaming pour charger des données à partir du stockage d’objets cloud configuré en tant de volume Unity Catalog (recommandé) ou à un emplacement externe.

Remarque

Pour savoir comment utiliser Delta Lake tables en tant que sources et récepteurs de diffusion en continu, consultez Delta table les lectures et écritures en continu.

Important

Les tables de diffusion en continu créées dans Databricks SQL sont prises en charge par un pipeline Delta Live serverless Tables. Votre espace de travail doit prendre en charge les pipelines serverless pour utiliser cette fonctionnalité.

Avant de commencer

Avant de commencer, vous devez satisfaire aux exigences suivantes :

Exigences pour l’espace de travail :

Exigences de calcul :

Vous devez utiliser une des options suivantes :

  • Un entrepôt SQL qui utilise le canal Current.

  • Calcul avec mode d’accès partagé sur Databricks Runtime 13.3 LTS ou ultérieur.

  • Calcul avec mode d’accès utilisateur unique sur Databricks Runtime 15.4 LTS ou version ultérieure.

    Sur Databricks Runtime 15.3 et ci-dessous, vous ne pouvez pas utiliser le calcul d’un utilisateur unique pour interroger des tables de diffusion en continu appartenant à d’autres utilisateurs. Vous pouvez utiliser l’ordinateur à utilisateur unique sur Databricks Runtime 15.3 ou une version antérieure uniquement si vous disposez du streaming table. Le créateur du table en est le propriétaire.

    Databricks Runtime 15.4 LTS et versions ultérieures prennent en charge les requêtes sur Delta Live Tables- générées tables avec un calcul mono-utilisateur, indépendamment de la propriété table. Pour tirer parti du filtrage des données fourni dans Databricks Runtime 15.4 LTS et versions ultérieures, vous devez confirmer que votre espace de travail est activé pour les de calcul serverless, car la fonctionnalité de filtrage des données qui prend en charge Delta Live Tables-generated tables s’exécute sur le calcul serverless. Vous pouvez être facturé pour les ressources de calcul serverless lorsque vous utilisez le calcul mono-utilisateur pour exécuter des opérations de filtrage des données. Consultez le contrôle d’accès affiné sur le calcul d’un seul utilisateur.

Conditions requises pour les autorisations :

  • Privilège READ FILES sur un emplacement externe Unity Catalog. Pour plus d’informations, consultez Créer un emplacement externe pour connecter le stockage cloud à Azure Databricks.
  • Privilège USE CATALOG sur le catalog dans lequel vous créez le tablede diffusion en continu.
  • Privilège USE SCHEMA sur le schema dans lequel vous créez le tablede diffusion en continu.
  • Privilège CREATE TABLE sur le schema dans lequel vous créez le tablede diffusion en continu.

Autres exigences :

  • Chemin d’accès à vos données sources.

    Exemple de chemin d’accès au volume : /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Exemple de chemin d’accès à l’emplacement externe : abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Remarque

    Cet article suppose que les données à charger se trouvent dans un emplacement de stockage cloud qui correspond à un volume Unity Catalog ou à un emplacement externe auquel vous avez accès.

Découvrir et afficher un aperçu des données sources

  1. Dans la barre latérale de votre espace de travail, cliquez sur Requêtes, puis sur Créer une requête.

  2. Dans l’éditeur de requête, select un entrepôt SQL qui utilise le canal Current à partir de la liste déroulante list.

  3. Collez ce qui suit dans l’éditeur, en remplaçant values entre crochets (<>) pour les informations identifiant vos données sources, puis cliquez sur Exécuter.

    Remarque

    Vous pouvez rencontrer des erreurs d’inférence schema lors de l’exécution de la fonction read_filestable évaluée si les valeurs par défaut de la fonction ne peuvent pas analyser vos données. Par exemple, vous devrez peut-être configurer le mode multiligne pour les fichiers CSV ou JSON multilignes. Pour obtenir une list des options de l’analyseur, consultez Fonction table read_files.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Charger des données dans un flux de données table

Pour créer un table de diffusion en continu à partir de données dans le stockage d’objets cloud, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter:

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Set le canal d’exécution

Les flux tables créés à l’aide d’entrepôts SQL sont rafraîchis automatiquement par un pipeline Delta Live Tables. Les pipelines Delta Live Tables utilisent le runtime dans le canal current par défaut. Consultez les notes de publication Delta Live Tables et le processus de mise à niveau des versions pour en savoir plus sur le processus de mise en production.

Databricks recommande d’utiliser le current canal pour les charges de travail de production. Les nouvelles fonctionnalités sont d’abord publiées sur le preview canal. Vous pouvez set un pipeline vers la chaîne Delta Live en préversion Tables pour tester de nouvelles fonctionnalités, par la spécification de preview comme propriété table. Vous pouvez spécifier cette propriété lorsque vous créez le table ou une fois le table créé à l’aide d’une instruction ALTER.

L’exemple de code suivant montre comment set le canal à afficher en préversion dans une instruction CREATE :

CREATE OR REPLACE MATERIALIZED VIEW foo.default.bar
TBLPROPERTIES ('pipelines.channel' = 'preview') as
SELECT
  *
FROM
  range(5)

Refresh un table de streaming à l’aide d’un pipeline DLT

Cette section décrit les modèles d’actualisation d’un table de diffusion en continu avec les données les plus récentes disponibles à partir des sources définies dans la requête.

Lorsque vous CREATE ou REFRESH un flux de données table, le update est traité à l’aide d’un pipeline Delta Live Tables serverless. Chaque table de diffusion en continu que vous définissez a un pipeline Delta Live Tables associé.

Après avoir exécuté la commande REFRESH, le lien de pipeline DLT est retourné. Vous pouvez utiliser le lien du pipeline DLT pour vérifier l’état du refresh.

Remarque

Seul le propriétaire table peut refresh une table de diffusion en continu pour get les données les plus récentes. L’utilisateur qui crée l'table est le propriétaire et le propriétaire ne peut pas être modifié. Vous devrez peut-être refresh votre table de stream avant d’utiliser des requêtes de voyage dans le temps.

Voir Qu’est-ce que Delta Live Tables ?.

Ingérer de nouvelles données uniquement

Par défaut, la fonction read_files lit toutes les données existantes dans le répertoire source pendant la création de table, puis traite les enregistrements nouvellement arrivés avec chaque refresh.

Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de table, set l’option includeExistingFiles pour false. Cela signifie que seules les données qui arrivent dans le répertoire après table création sont traitées. Par exemple :

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Entièrement en streaming refreshtable

Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique entier des données ou qui ont des périodes de rétention courtes, telles que Kafka, car la refresh complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Par exemple :

REFRESH STREAMING TABLE my_bronze_table FULL

Programmer un table de diffusion en continu pour un refresh automatique

Pour configurer une table de streaming pour l’refresh automatiquement en fonction d’une planification définie, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Par exemple, pour les requêtes de programme refresh, voir ALTER STREAMING TABLE.

Suivre l’état d’un refresh

Vous pouvez afficher l’état d’une tablerefresh de diffusion en continu en consultant le pipeline qui gère le table de diffusion en continu dans l’interface utilisateur Delta Live Tables, ou en affichant les Refresh Informations retournées par la commande DESCRIBE EXTENDED pour la diffusion en continu table.

DESCRIBE EXTENDED <table-name>

Ingestion en streaming à partir de Kafka

Pour obtenir un exemple d’ingestion en streaming à partir de Kafka, consultez read_kafka.

Grant utilisateurs accèdent à un table de diffusion en continu

Pour donner aux utilisateurs grant le privilège SELECT sur le flux table afin qu’ils puissent l’interroger, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter:

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Pour plus d’informations sur l’octroi de privilèges sur des objets sécurisables Unity Catalog, consultez Privilèges et objets sécurisables Unity Catalog.

Surveiller les exécutions à l’aide de l’historique des requêtes

Vous pouvez utiliser la page d'historique des requêtes pour accéder aux détails de la requête et aux profils de requêtes qui peuvent vous aider à identifier les requêtes mal performantes et les goulots d'étranglement dans le pipeline Delta Live Tables utilisé pour réaliser vos mises à jour de streaming table. Pour obtenir une vue d’ensemble du type d’informations disponibles dans les historiques de requête et les profils de requête, consultez l’historique des requêtes et le profil de requête.

Important

Cette fonctionnalité est disponible en préversion publique. Les administrateurs d’espace de travail peuvent activer cette fonctionnalité à partir de la page Aperçus. Consultez Gérer les préversions d’Azure Databricks.

Toutes les déclarations liées à la lecture en continu tables apparaissent dans l’historique des requêtes. Vous pouvez utiliser le filtre déroulant Instruction pour select n’importe quelle commande et inspecter les requêtes associées. Toutes les instructions CREATE sont suivies d’une instruction REFRESH qui s’exécute de manière asynchrone sur un pipeline Delta Live Tables. Les REFRESH instructions incluent généralement des plans de requête détaillés qui fournissent des insights sur l’optimisation des performances.

Pour accéder aux REFRESH instructions de l’interface utilisateur de l’historique des requêtes, procédez comme suit :

  1. Cliquez Icône Historique dans la barre latérale gauche pour ouvrir l’interface utilisateur de l’historique des requêtes.
  2. Select la case REFRESH dans le filtre déroulant Statement.
  3. Cliquez sur le nom de l’instruction de requête pour afficher les détails du résumé, comme la durée de la requête et les métriques agrégées.
  4. Cliquez sur Afficher le profil de requête pour ouvrir le profil de requête. Pour plus d’informations sur la navigation dans le profil de requête, consultez le profil de requête.
  5. Si vous le souhaitez, vous pouvez utiliser les liens dans la section Source de requête pour ouvrir la requête ou le pipeline associé.

Vous pouvez également accéder aux détails de requête à l’aide de liens dans l’éditeur SQL ou à partir d’un notebook attaché à un entrepôt SQL.

Remarque

Votre table de diffusion en continu doit être configurée pour s’exécuter à l’aide du canal de préversion. Consultez Set le canal d’exécution.

Ressources supplémentaires