Microsoft Fabric Data Warehouse 用 Spark コネクタ
Spark 開発者とデータ科学者は、Fabric Data Warehouse 用 Spark コネクタを使用すると、ウェアハウスとレイクハウスの SQL 分析エンドポイントのデータにアクセスして操作することができます。 コネクタには次のような機能があります。
- ウェアハウスまたは SQL Analytics エンドポイントからのデータは、同じワークスペース内または複数のワークスペース間で操作できます。
- レイクハウスの SQL 分析エンドポイントは、ワークスペースのコンテキストに基づいて自動的に検出されます。
- コネクタは、簡略化された Spark API を提供し、基になる複雑さを抽象化し、1 つのコード行だけで動作します。
- テーブルまたはビューにアクセスしている間、コネクタは SQL エンジン レベルで定義されたセキュリティ モデルを維持します。 これらのモデルには、オブジェクト レベル セキュリティ (OLS)、行レベル セキュリティ (RLS)、列レベル セキュリティ (CLS) が含まれます。
- コネクタは Fabric ランタイム 内にプレインストールされるため、個別にインストールする必要がなくなります。
Note
コネクタは、現在プレビューの段階です。 詳細については、この記事の下部の「現在の制限事項」を参照してください。
認証
Microsoft Entra 認証は、統合認証アプローチです。 ユーザーは Microsoft Fabric ワークスペースにサインインし、資格証明は認証と認可のために SQL エンジンに自動的に渡されます。 認証情報が自動マッピングされるため、ユーザーが特定の構成オプションを指定する必要はありません。
アクセス許可
SQL エンジンに接続するには、Warehouseまたは SQL 分析エンドポイント (項目レベル) に対する少なくとも読み取りアクセス許可 (SQL Server の 接続 アクセス許可と同様) が必要です。 また、ユーザーは、特定のテーブルまたはビューからデータを読み取るために、詳細なオブジェクト レベルのアクセス許可も必要です。 詳細については、「Microsoft Fabric のデータ ウェアハウスのセキュリティ」を参照してください。
Code テンプレートと例
メソッド シグネチャの使用
次のコマンドは、読み取り要求の synapsesql
メソッド シグネチャを示しています。 3 部構成 の tableName
引数は、レイクハウスの Warehouse と SQL 分析エンドポイントからテーブルまたはビューにアクセスするために必要です。 シナリオに基づいて、引数を次の名前で更新します。
- パート 1: 倉庫またはレイクハウスの名前。
- パート 2: スキーマの名前。
- パート 3: テーブルまたはビューの名前。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame
このコネクタでは、テーブルまたはビューから直接読み取るだけでなく、カスタム クエリまたはパススルー クエリを指定することもできます。このクエリは SQL エンジンに渡され、結果は Spark に返されます。
spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame
このコネクタは、指定されたウェアハウス/レイクハウスのエンドポイントを自動検出しますが、明示的に指定したい場合は、そのようにできます。
//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>")
同じワークスペース内のデータを読み取る
重要
ノートブックの先頭またはコネクタの使用を開始する前に、これらの インポート ステートメントを実行します。
Scala の場合
import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._
import com.microsoft.spark.fabric.Constants
PySpark (Python) の場合
import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants
次のコードは、Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
次のコードは、行数制限が 10 の Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)
次のコードは、フィルターを適用した後に Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")
次のコードは、選択した列についてのみ Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")
ワークスペース間でデータを読み取る
複数のワークスペースにまたがる 1 つのウェアハウスまたはレイクハウスのデータにアクセスして読み取るには、ウェアハウスまたはレイクハウスが存在するワークスペース ID を指定してから、レイクハウスまたはウェアハウスの項目 ID を指定します。 次の行は、ワークスペース ID とレイクハウスまたはウェアハウス ID を指定して、ウェアハウスまたはレイクハウスから Spark データフレームのテーブルまたはビューのデータを読み取る例を示しています。
# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
Note
ノートブックを実行している場合、コネクタは既定で、ノートブックに接続されているレイクハウスのワークスペース内の指定された倉庫またはレイクハウスを検索します。 別のワークスペースのウェアハウスまたはレイクハウスを参照するには、上記のようにワークスペース ID とレイクハウスまたはウェアハウスの項目 ID を指定します。
Warehouse からのデータに基づいてレイクハウス テーブルを作成する
これらのコード行は、Spark データフレームのテーブルまたはビューからデータを読み取り、それを使用してレイクハウス テーブルを作成する例を示しています。
df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")
トラブルシューティング
完了すると、読み取り応答スニペットがセルの出力に表示されます。 現在のセルで失敗が発生すると、後続のノートブックの CELL の実行も取り消されます。 詳細なエラー情報は、Spark アプリケーションのログで確認できます。
現在の制限
現時点では、コネクタは次の通りです。
- ファブリック ウェアハウスからのデータ取得と、レイクハウスアイテムのSQL分析エンドポイントをサポートします。
- Fabric DW では
Time Travel
がサポートされるようになりましたが、このコネクタは時間移動構文を持つクエリでは機能しません。 - 一貫性を保つため、Azure Synapse Analytics の Synapse Spark に付属しているような使用署名を保持します。 ただし、Azure Synapse Analytics で Dedicated SQL プールに接続して操作することは下位互換性がありません。
- 特殊文字を含む列名は、3 部構成のテーブル/ビュー名に基づいてクエリが送信される前にエスケープ文字を追加することによって処理されます。 カスタムまたはパススルー クエリ ベースの読み取りの場合、ユーザーは特殊文字を含む列名をエスケープする必要があります。