Ćwiczenie — integrowanie notesu w potokach usługi Azure Synapse
W tej lekcji utworzysz notes usługi Azure Synapse Spark do analizowania i przekształcania danych załadowanych przez przepływ danych mapowania oraz przechowywania danych w usłudze Data Lake. Utworzysz komórkę parametru, która akceptuje parametr ciągu, który definiuje nazwę folderu dla danych zapisywanych w notesie w usłudze Data Lake.
Następnie dodasz ten notes do potoku usługi Synapse i przekaż unikatowy identyfikator uruchomienia potoku do parametru notesu, aby później skorelować przebieg potoku z danymi zapisanymi przez działanie notesu.
Na koniec użyjesz centrum Monitor w programie Synapse Studio do monitorowania przebiegu potoku, uzyskania identyfikatora przebiegu, a następnie zlokalizowania odpowiednich plików przechowywanych w usłudze Data Lake.
Informacje o platformie Apache Spark i notesach
Apache Spark jest platformą przetwarzania równoległego, która obsługuje przetwarzanie w pamięci w celu zwiększania wydajności aplikacji do analizy danych big data. Platforma Apache Spark w usłudze Azure Synapse Analytics to jedna z implementacji platformy Apache Spark oferowanych przez firmę Microsoft w chmurze.
Notes platformy Apache Spark w programie Synapse Studio to interfejs internetowy umożliwiający tworzenie plików zawierających kod na żywo, wizualizacje i tekst narracji. Notesy to dobre miejsce do weryfikowania pomysłów i przeprowadzania krótkich eksperymentów w celu uzyskania szczegółowych informacji na podstawie danych. Notesy są również szeroko używane w scenariuszach przygotowywania danych, wizualizacji danych, uczenia maszynowego i innych scenariuszy danych big data.
Tworzenie notesu usługi Synapse Spark
Załóżmy, że utworzono przepływ danych mapowania w usłudze Synapse Analytics w celu przetwarzania, dołączania i importowania danych profilu użytkownika. Teraz chcesz znaleźć pięć najlepszych produktów dla każdego użytkownika, na podstawie tych, które są zarówno preferowane, jak i najlepsze, i mają najwięcej zakupów w ciągu ostatnich 12 miesięcy. Następnie chcesz obliczyć pięć najlepszych produktów.
W tym ćwiczeniu utworzysz notes usługi Synapse Spark, aby wykonać te obliczenia.
Otwórz program Synapse Analytics Studio (https://web.azuresynapse.net/) i przejdź do centrum danych .
Wybierz kartę Połączono (1), a następnie rozwiń podstawowe konto usługi Data Lake Storage (2) poniżej usługi Azure Data Lake Storage Gen2. Wybierz kontener wwi-02 (3) i otwórz folder top-products (4). Kliknij prawym przyciskiem myszy dowolny plik Parquet (5), wybierz element menu Nowy notes (6), a następnie wybierz polecenie Załaduj do ramki danych (7). Jeśli nie widzisz folderu, wybierz pozycję
Refresh
.Upewnij się, że notes jest dołączony do puli platformy Spark.
Zastąp nazwę pliku Parquet wartością
*.parquet
(1), aby wybrać wszystkie pliki Parquet w folderzetop-products
. Na przykład ścieżka powinna być podobna do:abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet
.Wybierz pozycję Uruchom wszystko na pasku narzędzi notesu, aby wykonać notes.
Uwaga
Przy pierwszym uruchomieniu notesu w puli Spark usługa Synapse tworzy nową sesję. Może to potrwać od około 3 do 5 minut.
Uwaga
Aby uruchomić tylko komórkę, umieść kursor na komórce i wybierz ikonę Uruchom komórkę po lewej stronie komórki lub wybierz komórkę, a następnie naciśnij Ctrl+Enter.
Utwórz nową komórkę poniżej, wybierając + przycisk i wybierając element komórki Kod. Przycisk + znajduje się pod komórką notesu po lewej stronie. Alternatywnie możesz również rozwinąć menu + Komórka na pasku narzędzi Notes i wybrać element komórki Kod.
Uruchom następujące polecenie w nowej komórce, aby wypełnić nową ramkę danych o nazwie
topPurchases
, utworzyć nowy widok tymczasowy o nazwietop_purchases
i wyświetlić pierwsze 100 wierszy:topPurchases = df.select( "UserId", "ProductId", "ItemsPurchasedLast12Months", "IsTopProduct", "IsPreferredProduct") # Populate a temporary view so we can query from SQL topPurchases.createOrReplaceTempView("top_purchases") topPurchases.show(100)
Dane wyjściowe powinny wyglądać mniej więcej tak:
+------+---------+--------------------------+------------+------------------+ |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct| +------+---------+--------------------------+------------+------------------+ | 148| 2717| null| false| true| | 148| 4002| null| false| true| | 148| 1716| null| false| true| | 148| 4520| null| false| true| | 148| 951| null| false| true| | 148| 1817| null| false| true| | 463| 2634| null| false| true| | 463| 2795| null| false| true| | 471| 1946| null| false| true| | 471| 4431| null| false| true| | 471| 566| null| false| true| | 471| 2179| null| false| true| | 471| 3758| null| false| true| | 471| 2434| null| false| true| | 471| 1793| null| false| true| | 471| 1620| null| false| true| | 471| 1572| null| false| true| | 833| 957| null| false| true| | 833| 3140| null| false| true| | 833| 1087| null| false| true|
Uruchom następujące polecenie w nowej komórce, aby utworzyć nowy widok tymczasowy przy użyciu języka SQL:
%%sql CREATE OR REPLACE TEMPORARY VIEW top_5_products AS select UserId, ProductId, ItemsPurchasedLast12Months from (select *, row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum from top_purchases ) a where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true order by a.UserId
Uwaga
Brak danych wyjściowych dla tego zapytania.
Zapytanie używa widoku tymczasowego
top_purchases
jako źródła i stosuje metodęrow_number() over
, aby zastosować numer wiersza dla rekordów dla każdego użytkownika, gdzieItemsPurchasedLast12Months
jest największy. Klauzulawhere
filtruje wyniki, więc pobieramy tylko do pięciu produktów, w których obaIsTopProduct
iIsPreferredProduct
są ustawione na wartość true. Daje to nam pięć najczęściej zakupionych produktów dla każdego użytkownika, w którym te produkty są również identyfikowane jako ulubione produkty, zgodnie z profilem użytkownika przechowywanym w usłudze Azure Cosmos DB.Uruchom następujące polecenie w nowej komórce, aby utworzyć i wyświetlić nową ramkę danych, która przechowuje wyniki widoku tymczasowego
top_5_products
utworzonego w poprzedniej komórce:top5Products = sqlContext.table("top_5_products") top5Products.show(100)
Powinny zostać wyświetlone dane wyjściowe podobne do następujących, które wyświetlają pięć preferowanych produktów na użytkownika:
Oblicz pięć najlepszych produktów w oparciu o te, które są preferowane przez klientów i kupowane najwięcej. W tym celu uruchom następujące polecenie w nowej komórce:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months") .groupBy("ProductId") .agg( sum("ItemsPurchasedLast12Months").alias("Total") ) .orderBy( col("Total").desc() ) .limit(5)) top5ProductsOverall.show()
W tej komórce pogrupowaliśmy pięć preferowanych produktów według identyfikatora produktu, podsumowaliśmy łączną liczbę elementów zakupionych w ciągu ostatnich 12 miesięcy, posortowaliśmy tę wartość w kolejności malejącej i zwróciliśmy pięć pierwszych wyników. Dane wyjściowe powinny być podobne do następujących:
+---------+-----+ |ProductId|Total| +---------+-----+ | 2107| 4538| | 4833| 4533| | 347| 4523| | 3459| 4233| | 4246| 4155| +---------+-----+
Tworzenie komórki parametru
Potoki usługi Azure Synapse wyszukują komórkę parametrów i traktują tę komórkę jako wartości domyślne dla parametrów przekazanych w czasie wykonywania. Aparat wykonywania doda nową komórkę pod komórką parameters z parametrami wejściowymi, aby zastąpić wartości domyślne. Jeśli komórka parametrów nie zostanie wyznaczona, w górnej części notesu zostanie wstawiona wstrzymywane komórki.
Ten notes zostanie wykonany z potoku. Chcemy przekazać parametr, który ustawia wartość zmiennej
runId
, która będzie używana do nazywania pliku Parquet. Uruchom następujące polecenie w nowej komórce:import uuid # Generate random GUID runId = uuid.uuid4()
Używamy biblioteki dostarczanej
uuid
z platformą Spark w celu wygenerowania losowego identyfikatora GUID. Chcemy zastąpićrunId
zmienną parametrem przekazywanym przez potok. Aby to zrobić, musimy przełączyć to jako komórkę parametru.Wybierz wielokropek akcji (...) w prawym górnym rogu komórki (1), a następnie wybierz pozycję Przełącz komórkę parametru (2).
Po przełączeniu tej opcji zobaczysz tag Parametry w komórce.
Wklej następujący kod w nowej komórce, aby użyć
runId
zmiennej jako nazwy pliku Parquet w ścieżce na/top5-products/
koncie podstawowego magazynu data lake. ZastąpYOUR_DATALAKE_NAME
ciąg w ścieżce nazwą podstawowego konta usługi Data Lake. Aby to znaleźć, przewiń w górę do komórki 1 w górnej części strony (1). Skopiuj konto magazynu data lake ze ścieżki (2). Wklej tę wartość jako zamiennikYOUR_DATALAKE_NAME
w ścieżce (3) wewnątrz nowej komórki, a następnie uruchom polecenie w komórce.%%pyspark top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
Sprawdź, czy plik został zapisany w usłudze Data Lake. Przejdź do centrum danych i wybierz kartę Połączone (1). Rozwiń podstawowe konto magazynu typu data lake, a następnie wybierz kontener wwi-02 (2). Przejdź do folderu top5-products (3). Powinien zostać wyświetlony folder dla pliku Parquet w katalogu z identyfikatorem GUID jako nazwą pliku (4).
Metoda zapisu Parquet w ramce danych w komórce notesu utworzyła ten katalog, ponieważ wcześniej nie istniała.
Dodawanie notesu do potoku usługi Synapse
Nawiązując do Przepływ danych mapowania opisanego na początku ćwiczenia, załóżmy, że chcesz wykonać ten notes po uruchomieniu Przepływ danych w ramach procesu aranżacji. W tym celu należy dodać ten notes do potoku jako nowe działanie notesu.
Wróć do notesu. Wybierz pozycję Właściwości (1) w prawym górnym rogu notesu, a następnie wprowadź
Calculate Top 5 Products
nazwę (2).Wybierz pozycję Dodaj do potoku (1) w prawym górnym rogu notesu, a następnie wybierz pozycję Istniejący potok (2).
Wybierz potok Zapisywania danych profilu użytkownika w potoku USŁUGI ASA (1),a następnie wybierz pozycję Dodaj *(2).
Program Synapse Studio dodaje działanie Notes do potoku. Zmień kolejność działania Notes, tak aby znajduje się po prawej stronie działania Przepływu danych. Wybierz działanie Przepływ danych i przeciągnij zielone pole połączenia potoku działania Powodzenie do działania Notes.
Strzałka działania Powodzenie instruuje potok, aby uruchamiał działanie notesu po pomyślnym uruchomieniu działania przepływu danych.
Wybierz działanie Notes (1), a następnie wybierz kartę Ustawienia (2), rozwiń węzeł Parametry podstawowe (3), a następnie wybierz pozycję + Nowy (4). Wprowadź
runId
w polu Nazwa (5). Wybierz pozycję Ciąg dla typu (6). W polu Wartość wybierz pozycję Dodaj zawartość dynamiczną (7).Wybierz pozycję Identyfikator uruchomienia potoku w obszarze Zmienne systemowe (1). Spowoduje to dodanie
@pipeline().RunId
do pola zawartości dynamicznej (2). Wybierz przycisk Zakończ (3), aby zamknąć okno dialogowe.Wartość Identyfikator przebiegu potoku jest unikatowym identyfikatorem GUID przypisanym do każdego uruchomienia potoku. Użyjemy tej wartości jako nazwy pliku Parquet, przekazując tę wartość jako parametr notesu
runId
. Następnie możemy przejrzeć historię uruchamiania potoku i znaleźć określony plik Parquet utworzony dla każdego uruchomienia potoku.Wybierz pozycję Opublikuj wszystko , a następnie pozycję Publikuj , aby zapisać zmiany.
Po zakończeniu publikowania wybierz pozycję Dodaj wyzwalacz (1), a następnie pozycję Wyzwól teraz (2), aby uruchomić zaktualizowany potok.
Wybierz przycisk OK , aby uruchomić wyzwalacz.
Monitorowanie działania potoku
Centrum Monitor umożliwia monitorowanie bieżących i historycznych działań dla usług SQL, Apache Spark i Pipelines.
Przejdź do centrum Monitor .
Wybierz pozycję Uruchomienia potoku (1)i poczekaj na pomyślne ukończenie przebiegu potoku (2). Może być konieczne odświeżenie widoku (3).
Wybierz nazwę potoku, aby wyświetlić uruchomienia działania potoku.
Zwróć uwagę zarówno na działanie Przepływu danych, jak i nowe działanie notesu (1).. Zanotuj wartość identyfikatora przebiegu potoku (2). Porównamy to z nazwą pliku Parquet wygenerowaną przez notes. Wybierz nazwę Calculate Top 5 Products Notebook (Oblicz 5 pierwszych produktów), aby wyświetlić jego szczegóły (3).
W tym miejscu zobaczymy szczegóły uruchomienia notesu. Możesz wybrać pozycję Odtwarzanie (1), aby obejrzeć odtwarzanie postępu w ramach zadań (2). W dolnej części można wyświetlić opcje diagnostyki i dzienników z różnymi opcjami filtrowania (3). Po prawej stronie możemy wyświetlić szczegóły przebiegu, takie jak czas trwania, identyfikator usługi Livy, szczegóły puli platformy Spark itd. Wybierz link Wyświetl szczegóły zadania, aby wyświetlić jego szczegóły (5).
Interfejs użytkownika aplikacji platformy Spark zostanie otwarty na nowej karcie, na której można wyświetlić szczegóły etapu. Rozwiń wizualizację języka DAG, aby wyświetlić szczegóły etapu.
Wróć do centrum danych .
Wybierz kartę Połączono (1), a następnie wybierz kontener wwi-02 (2) na podstawowym koncie magazynu typu data lake, przejdź do folderu top5-products (3) i sprawdź, czy istnieje folder dla pliku Parquet, którego nazwa jest zgodna z identyfikatorem uruchomienia potoku.
Jak widać, mamy plik, którego nazwa jest zgodna z identyfikatorem uruchomienia potoku, który wcześniej zanotowaliśmy:
Te wartości są zgodne, ponieważ przekazano identyfikator uruchomienia potoku do parametru
runId
działania Notes.