次の方法で共有


Microsoft Fabric Data Warehouse 用 Spark コネクタ

Spark 開発者とデータ科学者は、Fabric Data Warehouse 用 Spark コネクタを使用すると、ウェアハウスとレイクハウスの SQL 分析エンドポイントのデータにアクセスして操作することができます。 コネクタには次のような機能があります。

  • ウェアハウスまたは SQL Analytics エンドポイントからのデータは、同じワークスペース内または複数のワークスペース間で操作できます。
  • レイクハウスの SQL 分析エンドポイントは、ワークスペースのコンテキストに基づいて自動的に検出されます。
  • コネクタは、簡略化された Spark API を提供し、基になる複雑さを抽象化し、1 つのコード行だけで動作します。
  • テーブルまたはビューにアクセスしている間、コネクタは SQL エンジン レベルで定義されたセキュリティ モデルを維持します。 これらのモデルには、オブジェクト レベル セキュリティ (OLS)、行レベル セキュリティ (RLS)、列レベル セキュリティ (CLS) が含まれます。
  • コネクタは Fabric ランタイム 内にプレインストールされるため、個別にインストールする必要がなくなります。

Note

コネクタは、現在プレビューの段階です。 詳細については、この記事の下部の「現在の制限事項」を参照してください。

認証

Microsoft Entra 認証は、統合認証アプローチです。 ユーザーは Microsoft Fabric ワークスペースにサインインし、資格証明は認証と認可のために SQL エンジンに自動的に渡されます。 認証情報が自動マッピングされるため、ユーザーが特定の構成オプションを指定する必要はありません。

アクセス許可

SQL エンジンに接続するには、Warehouseまたは SQL 分析エンドポイント (項目レベル) に対する少なくとも読み取りアクセス許可 (SQL Server の 接続 アクセス許可と同様) が必要です。 また、ユーザーは、特定のテーブルまたはビューからデータを読み取るために、詳細なオブジェクト レベルのアクセス許可も必要です。 詳細については、「Microsoft Fabric のデータ ウェアハウスのセキュリティ」を参照してください。

Code テンプレートと例

メソッド シグネチャの使用

次のコマンドは、読み取り要求の synapsesql メソッド シグネチャを示しています。 3 部構成 の tableName 引数は、レイクハウスの Warehouse と SQL 分析エンドポイントからテーブルまたはビューにアクセスするために必要です。 シナリオに基づいて、引数を次の名前で更新します。

  • パート 1: 倉庫またはレイクハウスの名前。
  • パート 2: スキーマの名前。
  • パート 3: テーブルまたはビューの名前。
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

このコネクタでは、テーブルまたはビューから直接読み取るだけでなく、カスタム クエリまたはパススルー クエリを指定することもできます。このクエリは SQL エンジンに渡され、結果は Spark に返されます。

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

このコネクタは、指定されたウェアハウス/レイクハウスのエンドポイントを自動検出しますが、明示的に指定したい場合は、そのようにできます。

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") 

同じワークスペース内のデータを読み取る

重要

ノートブックの先頭またはコネクタの使用を開始する前に、これらの インポート ステートメントを実行します。

Scala の場合

import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._

import com.microsoft.spark.fabric.Constants

PySpark (Python) の場合

import com.microsoft.spark.fabric

from com.microsoft.spark.fabric.Constants import Constants

次のコードは、Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

次のコードは、行数制限が 10 の Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

次のコードは、フィルターを適用した後に Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

次のコードは、選択した列についてのみ Spark DataFrame 内のテーブルまたはビューからデータを読み取る例です。

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

ワークスペース間でデータを読み取る

複数のワークスペースにまたがる 1 つのウェアハウスまたはレイクハウスのデータにアクセスして読み取るには、ウェアハウスまたはレイクハウスが存在するワークスペース ID を指定してから、レイクハウスまたはウェアハウスの項目 ID を指定します。 次の行は、ワークスペース ID とレイクハウスまたはウェアハウス ID を指定して、ウェアハウスまたはレイクハウスから Spark データフレームのテーブルまたはビューのデータを読み取る例を示しています。

# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

Note

ノートブックを実行している場合、コネクタは既定で、ノートブックに接続されているレイクハウスのワークスペース内の指定された倉庫またはレイクハウスを検索します。 別のワークスペースのウェアハウスまたはレイクハウスを参照するには、上記のようにワークスペース ID とレイクハウスまたはウェアハウスの項目 ID を指定します。

Warehouse からのデータに基づいてレイクハウス テーブルを作成する

これらのコード行は、Spark データフレームのテーブルまたはビューからデータを読み取り、それを使用してレイクハウス テーブルを作成する例を示しています。

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

トラブルシューティング

完了すると、読み取り応答スニペットがセルの出力に表示されます。 現在のセルで失敗が発生すると、後続のノートブックの CELL の実行も取り消されます。 詳細なエラー情報は、Spark アプリケーションのログで確認できます。

現在の制限

現時点では、コネクタは次の通りです。

  • ファブリック ウェアハウスからのデータ取得と、レイクハウスアイテムのSQL分析エンドポイントをサポートします。
  • Fabric DW では Time Travel がサポートされるようになりましたが、このコネクタは時間移動構文を持つクエリでは機能しません。
  • 一貫性を保つため、Azure Synapse Analytics の Synapse Spark に付属しているような使用署名を保持します。 ただし、Azure Synapse Analytics で Dedicated SQL プールに接続して操作することは下位互換性がありません。
  • 特殊文字を含む列名は、3 部構成のテーブル/ビュー名に基づいてクエリが送信される前にエスケープ文字を追加することによって処理されます。 カスタムまたはパススルー クエリ ベースの読み取りの場合、ユーザーは特殊文字を含む列名をエスケープする必要があります。