Hantera datakvalitet med Delta Live Tables
Du använder förväntningar för att definiera datakvalitetsbegränsningar för innehållet i en datamängd. Med förväntningar kan du garantera att data som kommer in i tabeller uppfyller datakvalitetskraven och ger insikter om datakvaliteten för varje pipelineuppdatering. Du tillämpar förväntningar på frågor med hjälp av Python-dekoratörer eller SQL-villkorssatser.
Vad är Förväntningar på Delta Live Tables?
Förväntningar är valfria satser som du lägger till i Delta Live Tables-datauppsättningsdeklarationer som tillämpar datakvalitetskontroller på varje post som passerar genom en fråga.
En förväntan består av tre saker:
- En beskrivning som fungerar som en unik identifierare och gör att du kan spåra mått för villkoret.
- En boolesk instruktion som alltid returnerar sant eller falskt baserat på ett angivet villkor.
- En åtgärd att vidta när en post misslyckas med förväntan, vilket innebär att det booleska returnerar false.
Följande matris visar de tre åtgärder som du kan använda för ogiltiga poster:
Åtgärd | Resultat |
---|---|
warn (standard) | Ogiltiga poster skrivs till målet. fel rapporteras som ett mått för datamängden. |
droppe | Ogiltiga poster tas bort innan data skrivs till målet. fel rapporteras som ett mått för datamängden. |
misslyckas | Ogiltiga poster förhindrar att uppdateringen lyckas. Manuella åtgärder krävs före ombearbetning. |
Du kan visa datakvalitetsmått, till exempel antalet poster som bryter mot en förväntan genom att fråga händelseloggen Delta Live Tables. Se Övervaka Delta Live Tables-pipelines.
En fullständig referens till Delta Live Tables-datauppsättningsdeklarationssyntax finns i Referens för Python-språkreferens för Delta Live Tables eller SQL-språkreferens för Delta Live Tables.
Kommentar
- Du kan inkludera flera satser i alla förväntningar, men endast Python har stöd för att definiera åtgärder baserat på flera förväntningar. Se Flera förväntningar.
- Förväntningar måste definieras med SQL-uttryck. Du kan inte använda icke-SQL-syntax (till exempel Python-funktioner) när du definierar en förväntan.
Behålla ogiltiga poster
Använd operatorn expect
när du vill behålla poster som strider mot förväntningarna. Poster som bryter mot förväntningarna läggs till i måldatauppsättningen tillsammans med giltiga poster:
Python
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
Ta bort ogiltiga poster
Använd operatorn expect or drop
för att förhindra ytterligare bearbetning av ogiltiga poster. Poster som bryter mot förväntningarna tas bort från måldatauppsättningen:
Python
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
Fel vid ogiltiga poster
När ogiltiga poster är oacceptabla använder du operatorn expect or fail
för att stoppa körningen omedelbart när en post misslyckas med valideringen. Om åtgärden är en tabelluppdatering återställer systemet atomiskt transaktionen:
Python
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
Viktigt!
Om du har flera parallella flöden som definierats i en pipeline leder fel i ett enda flöde inte till att andra flöden misslyckas.
När en pipeline misslyckas på grund av en förväntansöverträdelse måste du åtgärda pipelinekoden för att hantera ogiltiga data korrekt innan du kör pipelinen igen.
Förväntade fel ändrar Spark-frågeplanen för dina omvandlingar för att spåra information som krävs för att identifiera och rapportera om överträdelser. För många frågor kan du använda den här informationen för att identifiera vilken indatapost som ledde till överträdelsen. Följande är ett exempel på ett undantag:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
Flera förväntningar
Du kan definiera förväntningar med en eller flera datakvalitetsbegränsningar i Python-pipelines. De här dekoratörerna accepterar en Python-ordlista som argument, där nyckeln är förväntansnamnet och värdet är förväntansbegränsningen.
Använd expect_all
för att ange flera datakvalitetsbegränsningar när poster som misslyckas med valideringen ska ingå i måldatauppsättningen:
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Använd expect_all_or_drop
för att ange flera begränsningar för datakvalitet när poster som misslyckas med valideringen ska tas bort från måldatauppsättningen:
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Använd expect_all_or_fail
för att ange flera begränsningar för datakvalitet när poster som misslyckas med valideringen ska stoppa pipelinekörningen:
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
Du kan också definiera en samling förväntningar som en variabel och skicka den till en eller flera frågor i pipelinen:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create cleaned and prepared dataset
Ogiltiga data i karantän
I följande exempel används förväntningar i kombination med tillfälliga tabeller och vyer. Det här mönstret ger dig mått för poster som skickar förväntanskontroller under pipelineuppdateringar och ger ett sätt att bearbeta giltiga och ogiltiga poster via olika underordnade sökvägar.
Kommentar
I det här exemplet läss exempeldata som ingår i Databricks-datauppsättningarna. Eftersom Databricks-datauppsättningarna inte stöds med en pipeline som publicerar till Unity Catalog fungerar det här exemplet bara med en pipeline som är konfigurerad att publicera till Hive-metaarkivet. Det här mönstret fungerar dock också med Unity Catalog-aktiverade pipelines, men du måste läsa data från externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.
import dlt
from pyspark.sql.functions import expr
rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))
@dlt.table(
name="raw_farmers_market"
)
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="farmers_market_quarantine",
temporary=True,
partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
return (
spark.read.table("LIVE.raw_farmers_market")
.select("MarketName", "Website", "Location", "State",
"Facebook", "Twitter", "Youtube", "Organic", "updateTime")
.withColumn("is_quarantined", expr(quarantine_rules))
)
@dlt.view(
name="valid_farmers_market"
)
def get_valid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=false")
)
@dlt.view(
name="invalid_farmers_market"
)
def get_invalid_farmers_market():
return (
spark.read.table("LIVE.farmers_market_quarantine")
.filter("is_quarantined=true")
)
Verifiera radantal mellan tabeller
Du kan lägga till ytterligare en tabell i pipelinen som definierar en förväntan att jämföra radantal mellan två materialiserade vyer eller strömmande tabeller. Resultatet av den här förväntan visas i händelseloggen och Delta Live Tables-användargränssnittet. I följande exempel verifieras lika antal rader mellan tabellerna tbla
och tblb
:
CREATE OR REFRESH MATERIALIZED VIEW count_verification(
CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
(SELECT COUNT(*) AS a_count FROM LIVE.tbla),
(SELECT COUNT(*) AS b_count FROM LIVE.tblb)
Utföra avancerad validering med förväntningar för Delta Live Tables
Du kan definiera materialiserade vyer med hjälp av aggregerade och anslutna frågor och använda resultatet av dessa frågor som en del av din förväntanskontroll. Detta är användbart om du vill utföra komplexa datakvalitetskontroller, till exempel genom att se till att en härledd tabell innehåller alla poster från källtabellen eller garantera likheten mellan en numerisk kolumn mellan tabeller.
I följande exempel verifieras att alla förväntade poster finns i report
tabellen:
CREATE MATERIALIZED VIEW report_compare_tests(
CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key
I följande exempel används en aggregering för att säkerställa att en primärnyckel är unik:
CREATE MATERIALIZED VIEW report_pk_tests(
CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk
Gör förväntningarna bärbara och återanvändbara
Du kan underhålla datakvalitetsregler separat från dina pipelineimplementeringar.
Databricks rekommenderar att du lagrar reglerna i en Delta-tabell med varje regel kategoriserad efter en tagg. Du använder den här taggen i datamängdsdefinitioner för att avgöra vilka regler som ska tillämpas.
I följande exempel skapas en tabell med namnet rules
för att underhålla regler:
CREATE OR REPLACE TABLE
rules
AS SELECT
col1 AS name,
col2 AS constraint,
col3 AS tag
FROM (
VALUES
("website_not_null","Website IS NOT NULL","validity"),
("location_not_null","Location IS NOT NULL","validity"),
("state_not_null","State IS NOT NULL","validity"),
("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)
I följande Python-exempel definieras datakvalitetsförväntningar baserat på de regler som lagras i rules
tabellen. Funktionen get_rules()
läser reglerna från rules
tabellen och returnerar en Python-ordlista som innehåller regler som matchar argumentet tag
som skickas till funktionen. Ordlistan används i dekoratörerna @dlt.expect_all_*()
för att framtvinga datakvalitetsbegränsningar. Till exempel kommer alla poster som misslyckas med reglerna som taggats med validity
att tas bort från raw_farmers_market
tabellen:
Kommentar
I det här exemplet läss exempeldata som ingår i Databricks-datauppsättningarna. Eftersom Databricks-datauppsättningarna inte stöds med en pipeline som publicerar till Unity Catalog fungerar det här exemplet bara med en pipeline som är konfigurerad att publicera till Hive-metaarkivet. Det här mönstret fungerar dock också med Unity Catalog-aktiverade pipelines, men du måste läsa data från externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.table("rules")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
I stället för att skapa en tabell med namnet rules
för att underhålla regler kan du skapa en Python-modul till huvudregler, till exempel i en fil med namnet rules_module.py
i samma mapp som notebook-filen:
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "location_not_null",
"constraint": "Location IS NOT NULL",
"tag": "validity"
},
{
"name": "state_not_null",
"constraint": "State IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
Ändra sedan föregående notebook-fil genom att importera modulen och ändra get_rules()
funktionen till att läsa från modulen i stället för från rules
tabellen:
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
df = spark.createDataFrame(get_rules_as_list_of_dict())
def get_rules(tag):
"""
loads data quality rules from a table
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
spark.read.table("LIVE.raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)