Aracılığıyla paylaş


Delta Live Tabloları ile veri kalitesini yönetme

Bir veri kümesinin içeriğinde veri kalitesi kısıtlamaları tanımlamak için beklentileri kullanırsınız. Beklentiler, tablolara gelen verilerin veri kalitesi gereksinimlerini karşıladığını garanti etmenize ve her işlem hattı güncelleştirmesi için veri kalitesiyle ilgili içgörüler sağlamanıza olanak sağlar. Python dekoratörlerini veya SQL kısıtlama yan tümcelerini kullanarak sorgulara beklentileri uygularsınız.

Delta Live Tables'ın beklentileri nelerdir?

Beklentiler, bir sorgudan geçen her kayda veri kalitesi denetimleri uygulayan Delta Live Tables veri kümesi bildirimlerine eklediğiniz isteğe bağlı yan tümcelerdir.

Beklenti üç şeyden oluşur:

  • Benzersiz bir tanımlayıcı işlevi gören ve kısıtlama ölçümlerini izlemenize olanak tanıyan bir açıklama.
  • Belirtilen bazı koşula göre her zaman true veya false döndüren boole deyimi.
  • Bir kayıt beklentide başarısız olduğunda, boole değerinin false döndürdüğü bir eylem.

Aşağıdaki matris, geçersiz kayıtlara uygulayabileceğiniz üç eylemi gösterir:

Eylem Sonuç
warn (varsayılan) Hedefe geçersiz kayıtlar yazılır; hatası, veri kümesi için bir ölçüm olarak bildirilir.
damla Veriler hedefe yazılmadan önce geçersiz kayıtlar bırakılır; hatası, veri kümesi için bir ölçüm olarak bildirilir.
Başarısız Geçersiz kayıtlar güncelleştirmenin başarılı olmasını engelliyor. Yeniden işlemeden önce el ile müdahale gereklidir.

Delta Live Tables olay günlüğünü sorgulayarak bir beklentiyi ihlal eden kayıt sayısı gibi veri kalitesi ölçümlerini görüntüleyebilirsiniz. Bkz . Delta Live Tables işlem hatlarını izleme.

Delta Live Tables veri kümesi bildirimi söz diziminin tam başvurusu için bkz . Delta Live Tables Python dil başvurusu veya Delta Live Tables SQL dil başvurusu.

Not

  • Herhangi bir beklentiye birden çok yan tümce dahil edebilirsiniz ancak yalnızca Python birden çok beklentiye göre eylem tanımlamayı destekler. Bkz . Birden çok beklenti.
  • Beklentiler SQL ifadeleri kullanılarak tanımlanmalıdır. Bir beklenti tanımlarken SQL dışı söz dizimi (örneğin Python işlevleri) kullanamazsınız.

Geçersiz kayıtları tutma

Beklentiyi expect ihlal eden kayıtları tutmak istediğinizde işlecini kullanın. Beklentiyi ihlal eden kayıtlar, geçerli kayıtlarla birlikte hedef veri kümesine eklenir:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Geçersiz kayıtları bırakma

Geçersiz kayıtların expect or drop daha fazla işlenmesini önlemek için işlecini kullanın. Beklentiyi ihlal eden kayıtlar hedef veri kümesinden bırakılır:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Geçersiz kayıtlarda başarısız

Geçersiz kayıtlar kabul edilemez olduğunda, kayıt doğrulama başarısız olduğunda yürütmeyi expect or fail hemen durdurmak için işlecini kullanın. İşlem bir tablo güncelleştirmesiyse, sistem atomik olarak işlemi geri alır:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Önemli

İşlem hattında tanımlanmış birden çok paralel akışınız varsa, tek bir akışın başarısız olması diğer akışların başarısız olmasına neden olmaz.

İşlem hattı, bir beklenti ihlali nedeniyle başarısız olduğunda, işlem hattını yeniden çalıştırmadan önce geçersiz verileri doğru şekilde işlemek için işlem hattı kodunu düzeltmeniz gerekir.

Başarısız beklentiler, ihlalleri algılamak ve raporlamak için gereken bilgileri izlemek için dönüşümlerinizin Spark sorgu planını değiştirir. Birçok sorguda, hangi giriş kaydının ihlale neden olduğunu belirlemek için bu bilgileri kullanabilirsiniz. Aşağıda örnek bir özel durum verilmiştir:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Birden çok beklenti

Python işlem hatlarında bir veya daha fazla veri kalitesi kısıtlamasıyla beklentileri tanımlayabilirsiniz. Bu dekoratörler bağımsız değişken olarak bir Python sözlüğü kabul eder; burada anahtar, beklenti adı ve değer de beklenti kısıtlamasıdır.

Başarısız doğrulamaya sahip kayıtların hedef veri kümesine eklenmesi gerektiğinde birden çok veri kalitesi kısıtlaması belirtmek için kullanın expect_all :

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Doğrulama başarısız olan kayıtlar hedef veri kümesinden bırakıldığında birden çok veri kalitesi kısıtlaması belirtmek için kullanın expect_all_or_drop :

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Başarısız olan kayıtların işlem hattı yürütmesini durdurması gerektiğinde birden çok veri kalitesi kısıtlaması belirtmek için kullanın expect_all_or_fail :

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Ayrıca bir beklenti koleksiyonunu değişken olarak tanımlayabilir ve bunu işlem hattınızdaki bir veya daha fazla sorguya geçirebilirsiniz:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Geçersiz verileri karantinaya al

Aşağıdaki örnek, geçici tablolar ve görünümlerle birlikte beklentileri kullanır. Bu düzen, işlem hattı güncelleştirmeleri sırasında beklenti denetimlerinden geçen kayıtlar için ölçümler sağlar ve farklı aşağı akış yollarından geçerli ve geçersiz kayıtları işlemenin bir yolunu sağlar.

Not

Bu örnekte Databricks veri kümelerine dahil edilen örnek veriler okunur. Databricks veri kümeleri Unity Kataloğu'nda yayımlanan bir işlem hattıyla desteklenmediğinden, bu örnek yalnızca Hive meta veri deposuna yayımlamak üzere yapılandırılmış bir işlem hattıyla çalışır. Ancak, bu desen Unity Kataloğu'nu etkinleştiren işlem hatlarıyla da çalışır, ancak dış konumlardaki verileri okumanız gerekir. Unity Kataloğu'nu Delta Live Tabloları ile kullanma hakkında daha fazla bilgi edinmek için bkz. Delta Live Tabloları işlem hatlarınızla Unity Kataloğu'nu kullanma.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    spark.read.table("LIVE.farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    spark.read.table("LIVE.farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Tablolar arasında satır sayısını doğrulama

İşlem hattınıza iki gerçekleştirilmiş görünüm veya akış tablosu arasındaki satır sayılarını karşılaştırma beklentisini tanımlayan ek bir tablo ekleyebilirsiniz. Bu beklentinin sonuçları olay günlüğünde ve Delta Live Tables kullanıcı arabiriminde görünür. Aşağıdaki örnek, ve tblb tabloları arasındaki tbla eşit satır sayısını doğrular:

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Delta Live Tables beklentileriyle gelişmiş doğrulama gerçekleştirme

Toplama ve birleştirme sorgularını kullanarak gerçekleştirilmiş görünümler tanımlayabilir ve bu sorguların sonuçlarını beklenti denetiminizin bir parçası olarak kullanabilirsiniz. Örneğin, türetilmiş bir tablonun kaynak tablodaki tüm kayıtları içerdiğinden emin olmak veya tablolar arasında sayısal bir sütunun eşitliğini garanti etmek gibi karmaşık veri kalitesi denetimleri gerçekleştirmek istiyorsanız bu yararlı olur.

Aşağıdaki örnek, beklenen tüm kayıtların report tabloda mevcut olduğunu doğrular:

CREATE MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

Aşağıdaki örnek, birincil anahtarın benzersizliğini sağlamak için bir toplama kullanır:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Beklentileri taşınabilir ve yeniden kullanılabilir hale getirme

Veri kalitesi kurallarını işlem hattı uygulamalarınızdan ayrı tutabilirsiniz.

Databricks, her kuralın bir etikete göre kategorilere ayrılmış olduğu bir Delta tablosunda kuralların depolanmasını önerir. Bu etiketi, hangi kuralların uygulanacağını belirlemek için veri kümesi tanımlarında kullanırsınız.

Aşağıdaki örnek, kuralları korumak için adlı rules bir tablo oluşturur:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

Aşağıdaki Python örneği, tabloda depolanan rules kurallara göre veri kalitesi beklentilerini tanımlar. İşlev, get_rules() tablodan kuralları rules okur ve işleve geçirilen bağımsız değişkenle eşleşen tag kuralları içeren bir Python sözlüğü döndürür. Sözlük, veri kalitesi kısıtlamalarını zorlamak için dekoratörlere uygulanır @dlt.expect_all_*() . Örneğin, etiketlenen validity kurallarda başarısız olan tüm kayıtlar tablodan raw_farmers_market bırakılır:

Not

Bu örnekte Databricks veri kümelerine dahil edilen örnek veriler okunur. Databricks veri kümeleri Unity Kataloğu'nda yayımlanan bir işlem hattıyla desteklenmediğinden, bu örnek yalnızca Hive meta veri deposuna yayımlamak üzere yapılandırılmış bir işlem hattıyla çalışır. Ancak, bu desen Unity Kataloğu'nu etkinleştiren işlem hatlarıyla da çalışır, ancak dış konumlardaki verileri okumanız gerekir. Unity Kataloğu'nu Delta Live Tabloları ile kullanma hakkında daha fazla bilgi edinmek için bkz. Delta Live Tabloları işlem hatlarınızla Unity Kataloğu'nu kullanma.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

Kuralları korumak için adlı rules bir tablo oluşturmak yerine, örneğin not defteriyle aynı klasörde adlı rules_module.py bir dosyada ana kurallara yönelik bir Python modülü oluşturabilirsiniz:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Ardından önceki not defterini değiştirmek için modülü içeri aktarın ve işlevi tablodan değil modülden rules okuyacak şekilde değiştiringet_rules():

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    spark.read.table("LIVE.raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )