Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma
Bu öğreticide ilk Delta Live Tables işlem hattınızı yapılandırma, temel ETL kodu yazma ve işlem hattı güncelleştirmesini çalıştırma adımları izlenir.
Bu öğreticideki tüm adımlar Unity Kataloğu'nu etkinleştirmiş çalışma alanları için tasarlanmıştır. Delta Live Tables işlem hatlarını eski Hive meta veri deposuyla çalışacak şekilde de yapılandırabilirsiniz. Bkz . Eski Hive meta veri deposuyla Delta Live Tables işlem hatlarını kullanma.
Not
Bu öğreticide Databricks not defterlerini kullanarak yeni işlem hattı kodu geliştirme ve doğrulama yönergeleri yer alır. Python veya SQL dosyalarındaki kaynak kodu kullanarak işlem hatlarını da yapılandırabilirsiniz.
Delta Live Tables söz dizimi kullanılarak yazılmış kaynak kodunuz varsa kodunuzu çalıştırmak için bir işlem hattı yapılandırabilirsiniz. Bkz . Delta Live Tables işlem hattını yapılandırma.
Unity Kataloğu tarafından yönetilen nesneler olarak gerçekleştirilmiş görünümler ve akış tabloları için yenileme zamanlamaları kaydetmek ve ayarlamak için Databricks SQL'de tam bildirim temelli SQL söz dizimini kullanabilirsiniz. Bkz. Databricks SQL'de gerçekleştirilmiş görünümleri kullanma ve Databricks SQL'de akış tablolarını kullanarak veri yükleme.
Örnek: New York bebek adları verilerini alma ve işleme
Bu makaledeki örnek, New York Eyalet bebek adlarının kayıtlarını içeren genel kullanıma açık bir veri kümesi kullanır. Bu örnekte, aşağıdaki işlemleri yapmak için Delta Live Tables işlem hattının kullanılması gösterilmektedir:
- Bir birimden tabloya ham CSV verilerini okuma.
- Alma tablosundaki kayıtları okuyun ve temizlenmiş veriler içeren yeni bir tablo oluşturmak için Delta Live Tables beklentilerini kullanın.
- Temizlenen kayıtları türetilmiş veri kümeleri oluşturan Delta Live Tables sorgularına giriş olarak kullanın.
Bu kod, madalyon mimarisinin basitleştirilmiş bir örneğini gösterir. Bkz . Madalyon göl evi mimarisi nedir?.
Bu örneğin uygulamaları Python ve SQL için sağlanır. Yeni bir işlem hattı ve not defteri oluşturmak için adımları izleyin ve ardından sağlanan kodu kopyalayıp yapıştırın.
Eksiksiz kod içeren örnek not defterleri de sağlanır.
Gereksinimler
İşlem hattını başlatmak için, Delta Live Tables kümesini tanımlayan bir küme ilkesine küme oluşturma izniniz veya erişiminiz olmalıdır. Delta Live Tables çalışma zamanı, işlem hattınızı çalıştırmadan önce bir küme oluşturur ve doğru izne sahip değilseniz başarısız olur.
Tüm kullanıcılar varsayılan olarak sunucusuz işlem hatlarını kullanarak güncelleştirmeleri tetikleyebilir. Sunucusuz hesap düzeyinde etkinleştirilmelidir ve çalışma alanı bölgenizde kullanılamayabilir. Bkz . Sunucusuz işlem etkinleştirme.
Bu öğreticideki örneklerde Unity Kataloğu kullanılır. Databricks, hedef şemada birden çok veritabanı nesnesi oluşturulduğundan bu öğreticiyi çalıştırmak için yeni bir şema oluşturulmasını önerir.
- Katalogda yeni şema oluşturmak için veya
ALL PRIVILEGES
veUSE CATALOG
ayrıcalıklarına sahipCREATE SCHEMA
olmanız gerekir. - Yeni şema oluşturamıyorsanız, bu öğreticiyi var olan bir şemada çalıştırın. Aşağıdaki ayrıcalıklara sahip olmanız gerekir:
-
USE CATALOG
üst katalog için. -
ALL PRIVILEGES
veyaUSE SCHEMA
hedefCREATE MATERIALIZED VIEW
şemada , veCREATE TABLE
ayrıcalıkları.
-
- Bu öğreticide örnek verileri depolamak için bir birim kullanılır. Databricks, bu öğretici için yeni bir birim oluşturulmasını önerir. Bu öğretici için yeni bir şema oluşturursanız, bu şemada yeni bir birim oluşturabilirsiniz.
- Mevcut şemada yeni bir birim oluşturmak için aşağıdaki ayrıcalıklara sahip olmanız gerekir:
-
USE CATALOG
üst katalog için. -
ALL PRIVILEGES
USE SCHEMA
veyaCREATE VOLUME
hedef şemadaki ayrıcalıklar.
-
- İsteğe bağlı olarak mevcut bir birimi kullanabilirsiniz. Aşağıdaki ayrıcalıklara sahip olmanız gerekir:
-
USE CATALOG
üst katalog için. -
USE SCHEMA
üst şema için. -
ALL PRIVILEGES
veyaREAD VOLUME
hedef birimde veWRITE VOLUME
.
-
- Mevcut şemada yeni bir birim oluşturmak için aşağıdaki ayrıcalıklara sahip olmanız gerekir:
Bu izinleri ayarlamak için Databricks yöneticinize başvurun. Unity Kataloğu ayrıcalıkları hakkında daha fazla bilgi için bkz . Unity Kataloğu ayrıcalıkları ve güvenliği sağlanabilir nesneler.
- Katalogda yeni şema oluşturmak için veya
0. Adım: Verileri indirme
Bu örnek bir Unity Kataloğu biriminden veri yükler. Aşağıdaki kod bir CSV dosyasını indirir ve belirtilen birimde depolar. Yeni bir not defteri açın ve bu verileri belirtilen birime indirmek için aşağıdaki kodu çalıştırın:
my_catalog = "<catalog-name>"
my_schema = "<schema-name>"
my_volume = "<volume-name>"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {my_catalog}.{my_schema}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {my_catalog}.{my_schema}.{my_volume}")
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
filename = "babynames.csv"
dbutils.fs.cp(download_url, volume_path + filename)
, <catalog-name>
ve <schema-name>
yerine Unity Kataloğu biriminin katalog, şema ve birim adlarını yazın<volume-name>
. Sağlanan kod, bu nesneler yoksa belirtilen şemayı ve birimi oluşturmaya çalışır. Unity Kataloğu'nda nesneler oluşturmak ve nesnelere yazmak için uygun ayrıcalıklara sahip olmanız gerekir. Bkz . Gereksinimler.
Not
Öğreticiye devam etmeden önce bu not defterinin başarıyla çalıştırıldığından emin olun. Bu not defterini işlem hattınızın bir parçası olarak yapılandırmayın.
1. Adım: İşlem hattı oluşturma
Delta Live Tables, Delta Live Tables söz dizimlerini kullanarak not defterlerinde veya dosyalarda (kaynak kod olarak adlandırılır) tanımlanan bağımlılıkları çözümleyerek işlem hatları oluşturur. Her kaynak kodu dosyası yalnızca bir dil içerebilir, ancak işlem hattına dile özgü birden çok not defteri veya dosya ekleyebilirsiniz.
Önemli
Kaynak kodu alanındaki hiçbir varlığı yapılandırmayın. Bu alanı siyah bırakmak, kaynak kodu yazma için bir not defteri oluşturur ve yapılandırılır.
Bu öğreticideki yönergeler sunucusuz işlem ve Unity Kataloğu'nu kullanır. Bu yönergelerde belirtilmeyen tüm yapılandırma seçenekleri için varsayılan ayarları kullanın.
Not
Çalışma alanınızda sunucusuz etkinleştirilmediyse veya desteklenmiyorsa, varsayılan işlem ayarlarını kullanarak öğreticiyi yazıldı olarak tamamlayabilirsiniz. İşlem hattı oluşturma kullanıcı arabiriminin Hedef bölümündeki Depolama seçenekleri'nin altında Unity Kataloğu'nu el ile seçmelisiniz.
Yeni bir işlem hattı yapılandırmak için aşağıdakileri yapın:
- Kenar çubuğunda Delta Live Tables
öğesine tıklayın. - İşlem hattı oluşturtıklayın.
İşlem Hattı adı alanına benzersiz bir işlem hattı adı yazın. - Sunucusuz onay kutusunu seçin.
- Hedef'nde, tabloların yayımlandığı Unity Kataloğu konumunu yapılandırmak için bir Kataloğu ve Şemasıseçin.
-
Gelişmiş'nde Yapılandırma ekle'ye tıklayın ve ardından aşağıdaki parametre adlarını kullanarak verileri indirdiğiniz katalog, şema ve birim için işlem hattı parametrelerini tanımlayın:
my_catalog
my_schema
my_volume
- Oluştur’a tıklayın.
Yeni işlem hattı için işlem hatları kullanıcı arabirimi görüntülenir. İşlem hattı için otomatik olarak bir kaynak kodu not defteri oluşturulur ve yapılandırılır.
Not defteri, kullanıcı dizininizdeki yeni bir dizinde oluşturulur. Yeni dizin ve dosyanın adı, işlem hattınızın adıyla eşleşsin. Örneğin, /Users/your.username@databricks.com/my_pipeline/my_pipeline
.
Bu not defterine erişim bağlantısı, İşlem hattı ayrıntıları panelindeki Kaynak kodu alanının altındadır. Sonraki adıma geçmeden önce not defterini açmak için bağlantıya tıklayın.
2. Adım: Python veya SQL ile not defterinde gerçekleştirilmiş görünümler ve akış tabloları bildirme
Delta Live Tables işlem hatları için kaynak kodunu etkileşimli olarak geliştirmek ve doğrulamak için Datbricks not defterlerini kullanabilirsiniz. Bu işlevi kullanmak için not defterinizi işlem hattına eklemeniz gerekir. Yeni oluşturduğunuz not defterini yeni oluşturduğunuz işlem hattına eklemek için:
- İşlem yapılandırma menüsünü açmak için sağ üstteki Bağlan'a tıklayın.
- 1. Adımda oluşturduğunuz işlem hattının adının üzerine gelin.
- Bağlan'a tıklayın.
Kullanıcı arabirimi, sağ üstteki Doğrula ve Başlat düğmelerini içerecek şekilde değişir. İşlem hattı kodu geliştirme için not defteri desteği hakkında daha fazla bilgi edinmek için bkz . Not defterlerinde Delta Live Tables işlem hatlarını geliştirme ve hatalarını ayıklama.
Önemli
- Delta Live Tables işlem hatları, planlama sırasında not defterindeki tüm hücreleri değerlendirir. Tüm amaçlı işlemlerde çalıştırılan veya iş olarak zamanlanan not defterlerinden farklı olarak, işlem hatları hücrelerin belirtilen sırada çalışmasını garanti etmemektedir.
- Not defterleri yalnızca tek bir programlama dili içerebilir. İşlem hattı kaynak kodu not defterlerinde Python ve SQL kodunu karıştırmayın.
Python veya SQL ile kod geliştirme hakkında ayrıntılı bilgi için bkz . Python ile işlem hattı kodu geliştirme veya SQL ile işlem hattı kodu geliştirme.
Örnek işlem hattı kodu
Bu öğreticideki örneği uygulamak için aşağıdaki kodu kopyalayıp işlem hattınızın kaynak kodu olarak yapılandırılmış not defterindeki bir hücreye yapıştırın.
Sağlanan kod aşağıdakileri yapar:
- Gerekli modülleri içeri aktarır (yalnızca Python).
- İşlem hattı yapılandırması sırasında tanımlanan parametrelere başvurur.
- Bir birimden alınan adlı
baby_names_raw
bir akış tablosu tanımlar. - Alınan verileri doğrulayan adlı
baby_names_prepared
gerçekleştirilmiş bir görünüm tanımlar. - Verilerin son derece iyileştirilmiş bir görünümüne sahip olan adlı
top_baby_names_2021
gerçekleştirilmiş bir görünümü tanımlar.
Python
# Import modules
import dlt
from pyspark.sql.functions import *
# Assign pipeline parameters to variables
my_catalog = spark.conf.get("my_catalog")
my_schema = spark.conf.get("my_schema")
my_volume = spark.conf.get("my_volume")
# Define the path to source data
volume_path = f"/Volumes/{my_catalog}/{my_schema}/{my_volume}/"
# Define a streaming table to ingest data from a volume
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferSchema", True)
.option("header", True)
.load(volume_path)
)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
# Define a materialized view that validates data and renames a column
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
spark.read.table("LIVE.baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
# Define a materialized view that has a filtered, aggregated, and sorted view of the data
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("LIVE.baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
SQL
-- Define a streaming table to ingest data from a volume
CREATE OR REFRESH STREAMING TABLE baby_names_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count
FROM STREAM(read_files(
'/Volumes/${my_catalog}/${my_schema}/${my_volume}/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST'));
-- Define a materialized view that validates data and renames a column
CREATE OR REFRESH MATERIALIZED VIEW baby_names_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM LIVE.baby_names_raw;
-- Define a materialized view that provides a filtered, aggregated, and sorted view of the data
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM LIVE.baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
3. Adım: İşlem hattı güncelleştirmesini başlatma
İşlem hattı güncelleştirmesini başlatmak için not defteri kullanıcı arabiriminin sağ üst kısmındaki Başlat düğmesine tıklayın.
Örnek not defterleri
Aşağıdaki not defterleri, bu makalede sağlanan kod örneklerini içerir. Bu not defterleri, bu makaledeki adımlarla aynı gereksinimlere sahiptir. Bkz . Gereksinimler.
Not defterini içeri aktarmak için aşağıdaki adımları tamamlayın:
- Not defteri kullanıcı arabirimini açın.
- + Yeni Not Defteri'ne> tıklayın.
- Boş bir not defteri açılır.
- Dosya>İçeri Aktar’a tıklayın. İçeri Aktar iletişim kutusu görüntülenir.
- İçeri aktarma kaynağı için URL seçeneğini belirleyin.
- Not defterinin URL'sini yapıştırın.
- İçe aktar'a tıklayın.
Bu öğretici, Delta Live Tables işlem hattınızı yapılandırmadan ve çalıştırmadan önce bir veri kurulum not defteri çalıştırmanızı gerektirir. Aşağıdaki not defterini içeri aktarın, not defterini bir işlem kaynağına ekleyin, , ve için my_catalog
gerekli değişkeni doldurun ve my_schema
Tümünümy_volume
.
İşlem hatları için veri indirme öğreticisi
Aşağıdaki not defterleri Python veya SQL'de örnekler sağlar. Bir not defterini içeri aktardığınızda, not defteri kullanıcı giriş dizininize kaydedilir.
Aşağıdaki not defterlerinden birini içeri aktardıktan sonra işlem hattı oluşturma adımlarını tamamlayın, ancak indirilen not defterini seçmek için Kaynak kodu dosya seçicisini kullanın. Kaynak kod olarak yapılandırılmış bir not defteriyle işlem hattını oluşturduktan sonra, işlem hattı kullanıcı arabiriminde Başlat'a tıklayarak bir güncelleştirmeyi tetikleyin.