Spark でデータを分析する
Spark を使用する利点の 1 つは、さまざまなプログラミング言語でコードを記述して実行できることです。これにより、既にお持ちのプログラミング スキルを活用し、特定のタスクに最適な言語を使用することができます。 新しい Azure Synapse Analytics Spark ノートブックの既定の言語は PySpark です。これは、データ操作と視覚化に対する強力なサポートにより、データ サイエンティストやアナリストが一般的に使用する Python の Spark 最適化バージョンです。 さらに、Scala (対話形式で使用できる Java 派生言語) やSQL (Spark SQL ライブラリに含まれる一般的に使用される SQL 言語のバリアント) などの言語を使用して、リレーショナル データ構造を操作できます。 ソフトウェア エンジニアは、Java や Microsoft .NET などのフレームワークを使用して Spark 上で実行されるコンパイル済みソリューションを作成することもできます。
データフレームを使用してデータを探索する
Spark では、"耐障害性分散データセット" (RDD) と呼ばれるデータ構造がネイティブで使用されます。ただし、RDD で直接動作するコードを記述 "できる" ものの、Spark で構造化データを操作するために最もよく使用されるデータ構造は、Spark SQL ライブラリの一部として提供される "データフレーム"です。 Spark のデータフレームは、ユビキタス Pandas Python ライブラリのデータフレームと似ていますが、Spark の分散処理環境で動作するように最適化されています。
注意
データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。
データフレームにデータを読み込む
仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 Azure Synapse Analytics ワークスペースのプライマリ ストレージ アカウントで、products.csv という名前のコンマ区切りのテキスト ファイルに次のデータが含まれるとします。
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Spark ノートブックでは、次の PySpark コードを使用してデータフレームにデータを読み込み、最初の 10 行を表示できます。
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
先頭の %%pyspark
行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 ノートブック インターフェイスのツール バーで既定として使用する言語を選択し、マジックを使用して特定のセルの選択をオーバーライドできます。 たとえば、製品データの同等の Scala コードの例を次に示します。
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
マジック %%spark
は Scala を指定するために使用されます。
これらのコード サンプルの両方で、次のような出力が生成されます。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
データフレーム スキーマを指定する
前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
次の PySpark の例は、product-data.csv という名前のファイルからデータフレームを読み込むスキーマをこの形式で指定する方法を示しています。
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
ここでも、結果は次のようになります。
ProductID | ProductName | カテゴリ | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | マウンテン バイク | 3399.9900 |
772 | Mountain-100 Silver, 42 | マウンテン バイク | 3399.9900 |
773 | Mountain-100 Silver, 44 | マウンテン バイク | 3399.9900 |
... | ... | ... | ... |
データフレームのフィルター処理とグループ化を行う
Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductName 列と ListPrice 列を取得します。
pricelist_df = df.select("ProductID", "ListPrice")
このコード例の結果は次のようになります。
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
ほとんどのデータ操作メソッドと同様に、select は新しいデータフレーム オブジェクトを返します。
ヒント
データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。
pricelist_df = df["ProductID", "ListPrice"]
メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品に対して ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
このコード例の結果は次のようになります。
ProductName | ListPrice |
---|---|
Mountain-100 Silver, 38 | 3399.9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
データをグループ化して集計するには、groupBy メソッドと集計関数を使用します。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
このコード例の結果は次のようになります。
カテゴリ | count |
---|---|
ヘッドセット | 3 |
ホイール | 14 |
マウンテン バイク | 32 |
... | ... |
Spark で SQL 式を使用する
Dataframe API は Spark SQL という名前の Spark ライブラリの一部であり、データ アナリストは SQL 式を使用してデータのクエリと操作を行います。
Spark カタログでデータベース オブジェクトを作成する
Spark カタログは、ビューやテーブルなどのリレーショナル データ オブジェクトのメタストアです。 Spark ランタイムでは、このカタログを使用して、任意の Spark 対応言語で記述されたコードと、一部のデータ アナリストや開発者にとってより自然な SQL 式をシームレスに統合できます。
Spark カタログでクエリを実行するためにデータフレーム内のデータを使用できるようにする最も簡単な方法の 1 つは、次のコード例に示すように、一時ビューを作成することです。
df.createOrReplaceTempView("products")
"ビュー" は一時的なもので、現在のセッションの終了時に自動的に削除されます。 また、カタログに保持される "テーブル" を作成して、Spark SQL を使用してクエリを実行できるデータベースを定義することもできます。
注意
このモジュールでは Spark カタログ テーブルについて詳しく説明しませんが、いくつかの重要な点を確認しておくことをお勧めします。
spark.catalog.createTable
メソッドを使用して、空のテーブルを作成できます。 テーブルは、カタログに関連付けられているストレージの場所に、基になるデータを格納するメタデータ構造です。 テーブルを削除すると、基になるデータも削除されます。- データフレームをテーブルとして保存するには、
saveAsTable
メソッドを使用します。 spark.catalog.createExternalTable
メソッドを使用して "外部" テーブルを作成できます。 外部テーブルではカタログ内のメタデータが定義されますが、外部ストレージの場所 (通常は、データ レイク内のフォルダー) から基になるデータが取得されます。 外部テーブルを削除しても、基になるデータは削除されません。
Spark SQL API を使用してデータのクエリを実行する
任意の言語で記述されたコードで Spark SQL API を使用して、カタログ内のデータに対してクエリを実行できます。 たとえば、次の PySpark コードでは、SQL クエリを使用して、products ビューからデータフレームとしてデータを返します。
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
このコード例の結果は、次の表のようになります。
ProductID | ProductName | ListPrice |
---|---|---|
38 | Mountain-100 Silver, 38 | 3399.9900 |
52 | Road-750 Black, 52 | 539.9900 |
... | ... | ... |
SQL コードを使用する
前の例では、Spark SQL API を使用して、Spark コードに SQL 式を埋め込む方法を示しました。 また、ノートブックで %%sql
マジックを使用して、次のようにカタログ内のオブジェクトに対してクエリを行う SQL コードを実行することもできます。
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
この SQL コード例では、次のように、ノートブックにテーブルとして自動的に表示される結果セットが返されます。
カテゴリ | ProductCount |
---|---|
ビブショーツ | 3 |
バイク ラック | 1 |
バイク スタンド | 1 |
... | ... |