Databricks Connect for Python の高度な使用方法
Note
この記事では、Databricks Runtime 14.0 以降用の Databricks Connect について説明します。
この記事では、Databricks Connect の基本的なセットアップ以上のトピックについて説明します。
Spark Connect 接続文字列を構成する
「クラスターへの接続を構成する」で説明されているオプションを使用してクラスターに接続するよりも詳細なオプションが、Spark Connect 接続文字列を使用した接続です。 remote
関数で文字列を渡すか、SPARK_REMOTE
環境変数を設定できます。
Note
Spark Connect 接続文字列を使用して接続するには、Databricks 個人用アクセス トークン認証のみを使用できます。
remote
関数を使用して接続文字列を設定するには、以下のようにします。
# Set the Spark Connect connection string in DatabricksSession.builder.remote.
from databricks.connect import DatabricksSession
workspace_instance_name = retrieve_workspace_instance_name()
token = retrieve_token()
cluster_id = retrieve_cluster_id()
spark = DatabricksSession.builder.remote(
f"sc://{workspace_instance_name}:443/;token={token};x-databricks-cluster-id={cluster_id}"
).getOrCreate()
または、SPARK_REMOTE
環境変数を設定します。
sc://<workspace-instance-name>:443/;token=<access-token-value>;x-databricks-cluster-id=<cluster-id>
次に、DatabricksSession
クラスを次のように初期化します。
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
Pyspark シェル
Databricks Connect for Python には、Databricks Connect を使用するように構成された PySpark REPL (Spark シェル) である pyspark
バイナリが付属しています。
追加のパラメーターなしで開始すると、シェルは、Azure Databricks クラスターに接続するために、環境 ( DATABRICKS_
環境変数や DEFAULT
構成プロファイルなど) から既定の資格情報を取得します。 接続の構成の詳細については、「 Compute configuration for Databricks Connect」を参照してください。
Spark シェルを起動し、実行中のクラスターに接続するには、アクティブな Python 仮想環境から次のいずれかのコマンドを実行します。
pyspark
Spark シェルが表示されます。次に例を示します。
Python 3.10 ... [Clang ...] on darwin Type "help", "copyright", "credits" or "license" for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 13.x.dev0 /_/ Using Python version 3.10 ... Client connected to the Spark Connect server at sc://...:.../;token=...;x-databricks-cluster-id=... SparkSession available as 'spark'. >>>
シェルが起動すると、
spark
オブジェクトを使用して、Databricks クラスターで Apache Spark コマンドを実行できます。spark.range(1,10).show()
などの単純な PySpark コマンドを実行します。 エラーがない場合は、正常に接続されています。Spark シェルと Python を使用してお使いのコンピューティングでコマンドを実行する方法については、「Interactive Analysis with the Spark Shell (Spark Shell による対話型分析) を参照してください。
組み込みの
spark
変数を使用して、実行中のクラスターのSparkSession
を表します。次に例を示します。>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
すべての Python コードはローカルで実行されますが、DataFrame 操作を含むすべての PySpark コードは、リモートの Azure Databricks ワークスペース内のクラスターで実行され、実行応答がローカル呼び出し元に返送されます。
Spark シェルを停止するには、
Ctrl + d
またはCtrl + z
を押すか、コマンドquit()
またはexit()
を実行します。
追加の HTTP ヘッダー
Databricks Connect は、gRPC over HTTP/2 経由で Databricks クラスターと通信します。
一部の上級ユーザーは、クライアントから送信される要求をより適切に制御するために、クライアントと Azure Databricks クラスターの間にプロキシ サービスをインストールすることを選択できます。
場合によっては、プロキシで HTTP 要求にカスタム ヘッダーが必要になる場合があります。
headers()
メソッドを使用して、カスタム ヘッダーを HTTP 要求に追加できます。
spark = DatabricksSession.builder.header('x-custom-header', 'value').getOrCreate()
証明書
クラスターが Azure Databricks ワークスペースの完全修飾ドメイン名 (FQDN) を解決するためにカスタム SSL/TLS 証明書を必要とする場合は、お使いのローカル開発マシンに環境変数 GRPC_DEFAULT_SSL_ROOTS_FILE_PATH
を設定する必要があります。 この環境変数は、クラスターにインストールされている証明書への完全なパスに設定する必要があります。
たとえば、Python コードではこの環境変数を次のように設定します。
import os
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = "/etc/ssl/certs/ca-bundle.crt"
環境変数を設定するその他の方法については、お使いのオペレーティング システムのドキュメントを参照してください。
ログとデバッグ ログ
Databricks Connect for Python では、標準の Python ログを使用してログが生成されます。
ログは標準エラー ストリーム (stderr) に出力され、既定では WARN レベル以上のログのみが出力されます。
環境変数 SPARK_CONNECT_LOG_LEVEL=debug
を設定すると、この既定値が変更され、DEBUG
レベル以上のすべてのログ メッセージが出力されます。