Aracılığıyla paylaş


Delta Live Tables'da filigranlarla durum bilgisi olan işlemeyi Optimize

Durumda tutulan verileri etkili bir şekilde yönetmek için, Delta Live Tables'de durum bilgisi içeren akış işleme yaparken filigranları kullanın: toplamalar, birleştirmeler ve yinelenenleri kaldırma da dahil olmak üzere. Bu makalede, Delta Live Tables sorgularınızda filigranların nasıl kullanılacağı açıklanır ve önerilen işlemlerin örnekleri yer alır.

Not

Toplamalar gerçekleştiren sorguların kademeli olarak işlenmesini ve her update'de tamamen yeniden hesaplanmamasını sağlamak için filigranları kullanmanız gerekir.

watermarknedir?

Akış işlemede watermark, toplamalar gibi durum bilgisi olan işlemleri gerçekleştirirken verileri işlemek için zamana dayalı eşik tanımlayabilen bir Apache Spark özelliğidir. Gelen veriler eşiğe ulaşılana kadar işlenir ve bu noktada eşik tarafından tanımlanan window zaman süresi sonlandırılır. Filigranlar, daha büyük veri kümelerini veya uzun süre çalışan işlemleri işlerken sorgu işleme sırasında sorun yaşamamak için kullanılabilir. Bu sorunlar, işleme sırasında durumunda tutulan veri miktarı nedeniyle yüksek gecikme süresi ve hatta yetersiz bellek (OOM) hataları içerebilir. Akış verileri doğası gereği sıralanmamış olduğundan, filigranlar zaman-window gibi işlemlerin doğru bir şekilde hesaplanmasına da destek olur.

Akış işlemede filigranları kullanma hakkında daha fazla bilgi edinmek için bkz . Apache Spark Yapılandırılmış Akış'ta Filigran oluşturma ve Veri işleme eşiklerini denetlemek için filigranları uygulama.

bir watermarknasıl tanımlarsınız?

Bir zaman damgası alanı ve geç veri'nin gelmesi için bir zaman eşiğini temsil eden bir değer belirterek bir watermark tanımlarsınız. Veriler, tanımlanan zaman eşiğinden sonra ulaşırsa geç kabul edilir. Örneğin, eşik 10 dakika olarak tanımlanırsa, 10 dakikalık eşikten sonra gelen kayıtlar bırakılabilir.

Tanımlanan eşikten sonra gelen kayıtlar bırakılabileceğinden, gecikme süresi ile doğruluk gereksinimlerinizi karşılayan bir eşik seçmek önemlidir. Daha küçük bir eşik seçmek kayıtların daha erken gönderilmesine neden olur, ancak geç kayıtların bırakılma olasılığının daha yüksek olduğu anlamına gelir. Daha büyük bir eşik, verilerin daha uzun beklemesi ancak büyük olasılıkla daha eksiksiz olması anlamına gelir. Büyük durum boyutu nedeniyle, daha büyük bir eşik ek bilgi işlem kaynakları da gerektirebilir. Eşik değeri verilerinize ve işleme gereksinimlerinize bağlı olduğundan, en uygun eşiği belirlemek için işlemenizin test edilmesi ve izlenmesi önemlidir.

watermarktanımlamak için Python'da withWatermark() işlevini kullanırsınız. SQL'de, watermarktanımlamak için WATERMARK yan tümcesini kullanın:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Stream-stream birleşimleriyle filigranları kullanma

Akış-akış birleşimleri için, join'in her iki tarafında bir watermark ve bir zaman aralığı ifadesi tanımlamanız gerekir. Her join kaynağının veri üzerinde eksik bir görüşü olduğundan, akış motoruna başka eşleşmeler yapılamayacağını belirtmek için zaman aralığı yan tümcesine ihtiyaç vardır. Zaman aralığı yan tümcesi, filigranları tanımlamak için kullanılan alanları kullanmalıdır.

Her akışın filigranlar için farklı eşikler gerektirdiği zamanlar olabileceğinden, akışların aynı eşiklere sahip olması gerekmez. Kaybolan verileri önlemek için akış motoru, en yavaş akışa dayanan bir genel watermark tutar.

Aşağıdaki örnek, bir reklam gösterim akışına ve reklamlara kullanıcı tıklama akışına katılır. Bu örnekte, gösterimden sonra 3 dakika içinde bir tıklama gerçekleşmelidir. 3 dakikalık zaman aralığı geçtikten sonra, artık eşleştirilemeyecek durumdaki satırlar bırakılır.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Filigranlarla pencereli toplamalar gerçekleştirme

Akış verileri üzerinde durum bilgisi olan yaygın bir işlem, pencereli toplama işlemidir. Pencereli toplamalar gruplandırılmış toplamalara benzer, ancak tanımlanan windowparçası olan satırların set için toplama values döndürülür.

bir window belirli bir uzunluk olarak tanımlanabilir ve bu windowparçası olan tüm satırlarda toplama işlemi gerçekleştirilebilir. Spark Streaming üç pencere türünü destekler:

  • Atlayan (sabit) pencereler: Sabit boyutlu, çakışmayan ve bitişik zaman aralıkları serisi. Bir giriş kaydı yalnızca tek bir window'a aittir.
  • Kayan pencereler: Yuvarlanan pencerelere benzer şekilde, kayan pencereler sabit boyutlu olur, ancak pencereler çakışabilir ve bir kayıt birden çok pencereye düşebilir.

Veriler window sonuna ek olarak watermarkuzunluğunu geçtiğinde, windowiçin yeni veri kabul edilmez, toplamanın sonucu yayılır ve window durumu bırakılır.

Aşağıdaki örnek, sabit bir windowkullanarak her 5 dakikada bir gösterimlerin toplamını hesaplar. Bu örnekte, select yan tümcesi impressions_windowdiğer adını kullanır ve window kendisi GROUP BY yan tümcesinin bir parçası olarak tanımlanır. window, bu örnekteki watermark, clickTimestampcolumn ile aynı zaman damgasını, column, temel almalıdır.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python'da saatlik sabit pencerelerde kar hesaplamaya benzer bir örnek:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Akış kayıtlarını yinelenenleri kaldırma

Yapılandırılmış Akış, tam olarak bir kez işleme garantisine sahiptir ancak veri kaynaklarından kayıtların otomatik olarak yinelenenlerini kaldırmaz. Örneğin, birçok ileti kuyruğunun en az bir kez garantisi olduğundan, bu ileti kuyruklarından birinden okurken yinelenen kayıtlar beklenmelidir. belirtilen herhangi bir alandaki kayıtların yinelenenlerini kaldırmak için işlevini kullanabilir dropDuplicatesWithinWatermark() , bazı alanlar farklı olsa bile (olay zamanı veya varış saati gibi) bir akıştan yinelenenleri kaldırabilirsiniz. dropDuplicatesWithinWatermark() işlevini kullanmak için bir watermark belirtmeniz gerekir. watermark tarafından belirtilen zaman diliminde gelen yinelenen tüm veriler düşürülür.

Sıralı veriler önemlidir çünkü sıralı olmayan veriler, watermark değerinin yanlış bir şekilde ileri geçmesine neden olur. Daha sonra, eski veriler geldiğinde geç kabul edilir ve bırakılır. İlk anlık görüntüyü watermarkbelirtilen zaman damgasına göre işlemek için withEventTimeOrder seçeneğini kullanın. seçeneğiwithEventTimeOrder, kullanarak veri kümesini tanımlayan kodda spark.databricks.delta.withEventTimeOrder.enabled bildirilebilir. Örneğin:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Not

Bu withEventTimeOrder seçenek yalnızca Python ile desteklenir.

Aşağıdaki örnekte, veriler clickTimestampsırasına göre işlenir ve birbirini izleyen 5 saniye içinde gelen yinelenen userId ve clickAdIdcolumns içeren kayıtlar çıkarılır.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Durum bilgili işleme için Optimize işlem hattı yapılandırması

Databricks, üretim sorunlarını ve aşırı gecikme süresini önlemeye yardımcı olmak için, özellikle işlemeniz büyük miktarda ara durumdan tasarruf edilmesini gerektiriyorsa durum bilgisi olan akış işlemeniz için RocksDB tabanlı durum yönetiminin etkinleştirilmesini önerir.

Sunucusuz işlem hatları, durum deposu yapılandırmalarını otomatik olarak yönetir.

İşlem hattını dağıtmadan önce aşağıdaki yapılandırmayı ayarlayarak RocksDB tabanlı durum yönetimini etkinleştirebilirsiniz:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB yapılandırma önerileri de dahil olmak üzere RocksDB durum deposu hakkında daha fazla bilgi edinmek için bkz . Azure Databricks'te RocksDB durum depounu yapılandırma.