Udostępnij za pośrednictwem


Opracowywanie kodu potoku przy użyciu języka SQL

Usługa Delta Live Tables wprowadza kilka nowych słów kluczowych i funkcji SQL do definiowania zmaterializowanych oraz strumieniowych views i tables w potokach. Obsługa języka SQL na potrzeby tworzenia potoków opiera się na podstawach usługi Spark SQL i dodaje obsługę funkcji przesyłania strumieniowego ze strukturą.

Użytkownicy zaznajomieni z ramkami danych PySpark mogą preferować tworzenie kodu potoku w języku Python. Język Python obsługuje bardziej rozbudowane testowanie i operacje, które są trudne do zaimplementowania przy użyciu języka SQL, takich jak operacje metaprogramowania. Zobacz Tworzenie kodu potoku przy użyciu języka Python.

Aby uzyskać pełne odniesienie składni Delta Live Tables SQL, zobacz odniesienie do języka Delta Live Tables SQL.

Podstawy opracowywania potoków w języku SQL

Kod SQL, który tworzy zestawy danych Delta Live Tables, używa składni CREATE OR REFRESH, aby zdefiniować zmaterializowane zestawy danych views i strumieniowe zestawy danych tables na podstawie wyników zapytań.

Słowo STREAM kluczowe wskazuje, czy źródło danych, do których odwołuje się klauzula SELECT , powinno być odczytywane za pomocą semantyki przesyłania strumieniowego.

Delta Live Tables kod źródłowy zasadniczo różni się od skryptów SQL. Delta Live Tables przetwarza wszystkie definicje zestawów danych we wszystkich plikach kodu źródłowego skonfigurowanych w ramach potoku i tworzy graf przepływu danych przed uruchomieniem jakichkolwiek zapytań. Kolejność zapytań wyświetlanych w notesie lub skrycie nie definiuje kolejności wykonywania.

Tworzenie zmaterializowanego widoku przy użyciu języka SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia zmaterializowanego widoku przy użyciu języka SQL:

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Tworzenie przesyłania strumieniowego table za pomocą SQL

Poniższy przykład kodu przedstawia podstawową składnię tworzenia przesyłania strumieniowego table za pomocą języka SQL.

Uwaga

Nie wszystkie źródła danych obsługują odczyty przesyłane strumieniowo, a niektóre źródła danych powinny być zawsze przetwarzane za pomocą semantyki przesyłania strumieniowego.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Ładowanie danych z magazynu obiektów

Usługa Delta Live Tables obsługuje ładowanie danych ze wszystkich formatów obsługiwanych przez usługę Azure Databricks. Zobacz Opcje formatu danych.

Uwaga

W tych przykładach używane są dane dostępne w obszarze /databricks-datasets automatycznie zainstalowanym w obszarze roboczym. Usługa Databricks zaleca używanie ścieżek woluminów lub identyfikatorów URI w chmurze w celu odwołowania się do danych przechowywanych w magazynie obiektów w chmurze. Zobacz Czym jest CatalogvolumesUnity?.

Usługa Databricks zaleca używanie automatycznego modułu ładującego i przesyłania strumieniowego tables podczas konfigurowania obciążeń pozyskiwania przyrostowego względem danych przechowywanych w magazynie obiektów w chmurze. Zobacz Co to jest moduł automatycznego ładowania?.

Program SQL używa read_files funkcji do wywoływania funkcji automatycznego modułu ładującego. Należy również użyć słowa kluczowego STREAM , aby skonfigurować odczyt przesyłania strumieniowego za pomocą polecenia read_files.

Poniższy przykład tworzy streaming table z plików JSON przy użyciu Auto Loader.

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Funkcja read_files obsługuje również semantykę wsadową w celu utworzenia zmaterializowanych views. W poniższym przykładzie użyto semantyki wsadowej do odczytania katalogu JSON i utworzenia zmaterializowanego widoku:

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Weryfikowanie danych z oczekiwaniami

Możesz użyć oczekiwań do set i wymusić ograniczenia jakości danych. Zobacz Zarządzanie jakością danych za pomocą usługi Delta Live Tables.

Poniższy kod definiuje oczekiwaną nazwę, valid_data która odrzuca rekordy, które mają wartość null podczas pozyskiwania danych:

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Zmaterializowane zapytanie views i zapytanie przesyłania strumieniowego tables zdefiniowane w potoku.

Użyj LIVEschema do wykonywania zapytań dotyczących zdefiniowanych w potoku innych zmaterializowanych views oraz zasobów przesyłania strumieniowego tables.

W poniższym przykładzie zdefiniowano cztery zestawy danych:

  • Przesyłanie strumieniowe table o nazwie orders, które pobiera dane JSON.
  • Zmaterializowany widok o nazwie customers , który ładuje dane CSV.
  • Zmaterializowany widok o nazwie customer_orders , który łączy rekordy z orders zestawów danych i customers , rzutuje znacznik czasu zamówienia na datę i wybiera customer_idpola , order_number, statei order_date .
  • Zmaterializowany widok o nazwie daily_orders_by_state , który agreguje dzienną liczbę zamówień dla każdego stanu.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;