Lakehouse öğreticisi: Lakehouse'da verileri hazırlama ve dönüştürme
Bu öğreticide, lakehouse'unuzda ham verileri dönüştürmek ve hazırlamak için Spark çalışma zamanı ile not defterlerini kullanacaksınız.
Önkoşullar
Veri içeren bir lakehouse'nuz yoksa şunları uygulamanız gerekir:
- Bir göl evi oluşturun ve
- Lakehouse'a veri alın.
Verileri hazırlama
Önceki öğretici adımlarından, kaynaktan lakehouse'un Dosyalar bölümüne alınan ham veriler var. Artık bu verileri dönüştürebilir ve Delta tabloları oluşturmaya hazırlayabilirsiniz.
Not defterlerini Lakehouse Öğreticisi Kaynak Kodu klasöründen indirin.
Ekranın sol alt kısmında bulunan değiştiriciden Veri Madenciliği'ı seçin.
Giriş sayfasının üst kısmındaki Yeni bölümünden Not defterini içeri aktar'ı seçin.
Ekranın sağ tarafında açılan İçeri aktarma durumu bölmesinden Karşıya Yükle'yi seçin.
Bu bölümün ilk adımında indirdiğiniz tüm not defterlerini seçin.
Aç'ı seçin. tarayıcı penceresinin sağ üst köşesinde içeri aktarmanın durumunu belirten bir bildirim görüntülenir.
İçeri aktarma işlemi başarılı olduktan sonra çalışma alanının öğeler görünümüne gidin ve yeni içeri aktarılan not defterlerini görün. Wwilakehouse lakehouse'u seçerek açın.
Wwilakehouse lakehouse açıldıktan sonra üst gezinti menüsünden Not defterini>aç Mevcut not defterini aç'ı seçin.
Mevcut not defterleri listesinden 01 - Delta Tabloları Oluştur not defterini seçin ve Aç'ı seçin.
Göl evi Gezgini'ndeki açık not defterinde, not defterinin açılmış göl kutunuza zaten bağlı olduğunu görürsünüz.
Not
Doku, iyileştirilmiş Delta lake dosyaları yazmak için V sırası özelliği sağlar. V sırası, en iyi duruma getirilmemiş Delta Lake dosyalarına göre performans hızlandırmayı genellikle üç-dört kez ve 10 kata kadar artırır. Dokuda Spark, varsayılan 128 MB boyutuna sahip dosyalar oluştururken bölümleri dinamik olarak iyileştirir. Hedef dosya boyutu, yapılandırmalar kullanılarak iş yükü gereksinimleri başına değiştirilebilir.
Yazma özelliğini en iyi duruma getirme özelliğiyle Apache Spark altyapısı, yazılan dosya sayısını azaltır ve yazılan verilerin tek tek dosya boyutunu artırmayı amaçlar.
Lakehouse'un Tablolar bölümünde Delta lake tabloları olarak veri yazmadan önce, iyileştirilmiş veri yazma ve gelişmiş okuma performansı için iki Doku özelliği (V sırası ve Yazmayı İyileştir) kullanırsınız. Bu özellikleri oturumunuzda etkinleştirmek için bu yapılandırmaları not defterinizin ilk hücresinde ayarlayın.
Not defterini başlatmak ve tüm hücreleri sırayla yürütmek için üst şeritte (Giriş altında) Tümünü çalıştır'ı seçin. Ya da yalnızca belirli bir hücreden kod yürütmek için, üzerine gelindiğinde hücrenin solunda görünen Çalıştır simgesini seçin veya denetim hücredeyken klavyenizde SHIFT + ENTER tuşlarına basın.
Bir hücreyi çalıştırırken, Doku bunları Canlı Havuz aracılığıyla sağladığından temel Spark havuzunu veya küme ayrıntılarını belirtmeniz gerekmezdi. Her Doku çalışma alanı, Canlı Havuz adlı varsayılan bir Spark havuzuyla birlikte gelir. Bu, not defterleri oluşturduğunuzda Spark yapılandırmalarını veya küme ayrıntılarını belirtme konusunda endişelenmeniz gerekmeyecek anlamına gelir. İlk not defteri komutunu yürütürken, canlı havuz birkaç saniye içinde çalışır durumda olur. Spark oturumu oluşturulur ve kodu yürütmeye başlar. Spark oturumu etkinken sonraki kod yürütme işlemi bu not defterinde neredeyse anında kullanılmaktadır.
Ardından, lakehouse'un Dosyalar bölümünden ham verileri okuyacak ve dönüştürmenin bir parçası olarak farklı tarih bölümleri için daha fazla sütun ekleyebilirsiniz. Son olarak, yeni oluşturulan veri bölümü sütunlarına (Yıl ve Çeyrek) göre Delta tablo biçimi olarak yazmadan önce verileri bölümlendirmek için Spark API'sine göre bölümleme kullanırsınız.
from pyspark.sql.functions import col, year, month, quarter table_name = 'fact_sale' df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/fact_sale_1y_full') df = df.withColumn('Year', year(col("InvoiceDateKey"))) df = df.withColumn('Quarter', quarter(col("InvoiceDateKey"))) df = df.withColumn('Month', month(col("InvoiceDateKey"))) df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
Olgu tabloları yüklendikten sonra, diğer boyutların verilerini yüklemeye geçebilirsiniz. Aşağıdaki hücre, parametre olarak geçirilen tablo adlarının her biri için lakehouse'un Dosyalar bölümünden ham verileri okumak için bir işlev oluşturur. Ardından boyut tablolarının listesini oluşturur. Son olarak, tablo listesinde döngüler oluşturur ve giriş parametresinden okunan her tablo adı için bir Delta tablosu oluşturur. Bu örnekte kullanılan sütun kullanılmadığından betiğin adlı
Photo
sütunu bırakdığını unutmayın.from pyspark.sql.types import * def loadFullDataFromSource(table_name): df = spark.read.format("parquet").load('Files/wwi-raw-data/WideWorldImportersDW/parquet/full/' + table_name) df = df.drop("Photo") df.write.mode("overwrite").format("delta").save("Tables/" + table_name) full_tables = [ 'dimension_city', 'dimension_customer', 'dimension_date', 'dimension_employee', 'dimension_stock_item' ] for table in full_tables: loadFullDataFromSource(table)
Oluşturulan tabloları doğrulamak için wwilakehouse lakehouse'da sağ tıklayın ve yenile'yi seçin. Tablolar görüntülenir.
Çalışma alanının öğeler görünümüne tekrar gidin ve wwilakehouse lakehouse'u seçerek açın.
Şimdi ikinci not defterini açın. Lakehouse görünümünde şeritten Not defterini>aç Varolan not defterini aç'ı seçin.
Mevcut not defterleri listesinden 02 - Veri Dönüştürme - İş not defterini seçerek açın.
Göl evi Gezgini'ndeki açık not defterinde, not defterinin açılmış göl kutunuza zaten bağlı olduğunu görürsünüz.
Bir kuruluşta Scala/Python ile çalışan veri mühendisleri ve SQL (Spark SQL veya T-SQL) ile çalışan diğer veri mühendisleri olabilir ve bunların hepsi aynı veri kopyası üzerinde çalışır. Doku, farklı deneyime ve tercihe sahip bu farklı grupların çalışmasını ve işbirliği yapmasına olanak tanır. İki farklı yaklaşım, iş toplamlarını dönüştürür ve oluşturur. Sizin için uygun olanı seçebilir veya performanstan ödün vermeden tercihinize göre bu yaklaşımları karıştırıp eşleştirebilirsiniz:
Yaklaşım 1 - İş toplamları oluşturmak üzere verileri birleştirmek ve toplamak için PySpark'ı kullanın. Bu yaklaşım, programlama (Python veya PySpark) arka planı olan biri için tercih edilir.
Yaklaşım 2 - Spark SQL'i kullanarak iş toplamları oluşturmak için verileri birleştirin ve toplar. Bu yaklaşım, Spark'a geçiş yaparak SQL arka planına sahip biri için tercih edilir.
Yaklaşım 1 (sale_by_date_city) - İş toplamları oluşturmak için verileri birleştirmek ve toplamak için PySpark'ı kullanın. Aşağıdaki kodla, her birinin mevcut Delta tablosuna başvuran üç farklı Spark veri çerçevesi oluşturursunuz. Ardından veri çerçevelerini kullanarak bu tabloları birleştirir, toplama oluşturmak için gruplandırma yapar, sütunlardan birkaçını yeniden adlandırır ve son olarak verileri kalıcı hale getirmek için lakehouse'un Tablolar bölümüne Delta tablosu olarak yazarsınız.
Bu hücrede, her birinin var olan bir Delta tablosuna başvuran üç farklı Spark veri çerçevesi oluşturacaksınız.
df_fact_sale = spark.read.table("wwilakehouse.fact_sale") df_dimension_date = spark.read.table("wwilakehouse.dimension_date") df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
Daha önce oluşturulan veri çerçevelerini kullanarak bu tabloları birleştirmek için aşağıdaki kodu aynı hücreye ekleyin. Toplama oluşturmak için gruplandırın, sütunlardan birkaçını yeniden adlandırın ve son olarak lakehouse'un Tablolar bölümüne Delta tablosu olarak yazın.
sale_by_date_city = df_fact_sale.alias("sale") \ .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \ .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \ .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\ .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\ .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\ .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\ .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\ .withColumnRenamed("sum(Profit)", "SumOfProfit")\ .orderBy("date.Date", "city.StateProvince", "city.City") sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
Yaklaşım 2 (sale_by_date_employee) - İş toplamları oluşturmak için verileri birleştirmek ve toplamak için Spark SQL'i kullanın. Aşağıdaki kodla, üç tabloyu birleştirerek geçici bir Spark görünümü oluşturur, toplama oluşturmak için gruplandırır ve sütunlardan birkaçını yeniden adlandırırsınız. Son olarak, geçici Spark görünümünden okur ve son olarak verileri kalıcı hale getirmek için lakehouse'un Tablolar bölümünde Delta tablosu olarak yazarsınız.
Bu hücrede, üç tabloyu birleştirerek geçici bir Spark görünümü oluşturursunuz, toplama oluşturmak için gruplandırma yapar ve sütunlardan birkaçını yeniden adlandırırsınız.
%%sql CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee AS SELECT DD.Date, DD.CalendarMonthLabel , DD.Day, DD.ShortMonth Month, CalendarYear Year ,DE.PreferredName, DE.Employee ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax ,SUM(FS.TaxAmount) SumOfTaxAmount ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax ,SUM(Profit) SumOfProfit FROM wwilakehouse.fact_sale FS INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
Bu hücrede, önceki hücrede oluşturulan geçici Spark görünümünden okuyacak ve son olarak bunu göl binasının Tablolar bölümünde Delta tablosu olarak yazacaksınız.
sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee") sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
Oluşturulan tabloları doğrulamak için wwilakehouse lakehouse'da sağ tıklayın ve Yenile'yi seçin. Toplama tabloları görüntülenir.
İki yaklaşım benzer bir sonuç üretir. Yeni bir teknoloji öğrenme veya performans konusunda ödün verme ihtiyacını en aza indirmek için arka planınıza ve tercihinize en uygun yaklaşımı seçin.
Verileri Delta lake dosyaları olarak yazdığınızı fark edebilirsiniz. Doku'nun otomatik tablo bulma ve kayıt özelliği bunları alır ve meta veri deposuna kaydeder. SQL ile kullanılacak tablolar oluşturmak için deyimleri açıkça çağırmanız CREATE TABLE
gerekmez.