Apache Spark における書き込みの最適化の必要性について
Apache Spark のようなビッグデータ処理エンジンの分析ワークロードは、標準化されたより大きなサイズのファイルを使用したときに最も効率的に実行されます。 ファイル サイズ、ファイル数、Spark ワーカーの数、およびその構成の間の関係は、パフォーマンスに大きく影響します。 データ レイク テーブルへのワークロードの取り込みは、常に多数の小さなファイルを書き込むという継承された特性を持つ可能性があります。このシナリオは、一般的に "小さなファイルの問題" と呼ばれます。
書き込みの最適化は、Synapse の Delta Lake 機能で、書き込むファイルの数を減らし、書き込むデータの個々のファイル サイズを大きくすることを目的としています。 既定の 128 MB サイズのファイルを生成するときに、パーティションを動的に最適化します。 ターゲット ファイルのサイズは、 構成を使用してワークロード要件ごとに変更される可能性があります。
この機能では、パーティション間でデータのシャッフル フェーズを余分に行うことでファイル サイズを実現するため、データを書き込む際に余分な処理コストが発生します。 この書き込みによって生じる小さなペナルティは、テーブルの読み取り効率によって相殺されるはずです。
注意
- これは、Apache Spark バージョン 3.1 以降の Synapse プールで使用できます。
書き込みの最適化の利点
- Delta Lake テーブルで、バッチとストリーミングの両方の書き込みパターンで利用できます。
spark.write
コマンドのパターンを変更する必要はありません。 この機能は、構成設定またはテーブル プロパティで有効にできます。- OPTIMIZE コマンドと比較して、書き込みトランザクションの数を減らすことができます。
- OPTIMIZE 操作は、より少ないファイルで動作するので、より高速になります。
- 参照されていない古いファイルを削除するための VACUUM コマンドもより高速に動作するようになります。
- クエリでは、スキャンされるファイルが少なくなり、ファイル サイズもより適切になるた、め、読み取りパフォーマンスまたはリソース使用量が向上します。
書き込みの最適化の使用シナリオ
いつ使用するか
- Delta Lake パーティション テーブルは、最適ではない (128 MB 未満) または非標準のファイル サイズ (それ自体のサイズが異なるファイル) を生成する書き込みパターンの影響を受けます。
- 最適でないファイル サイズでディスクに書き込まれる再パーティションされたデータ フレーム。
- UPDATE、DELETE、MERGE、CREATE TABLE AS SELECT、INSERT INTO などの小規模なバッチ SQL コマンドの対象となる Delta Lake パーティション テーブル。
- Delta Lake パーティション テーブルにデータ パターンを追加するストリーミング インジェスト シナリオで、追加の書き込み待機時間が許容される場合。
使用すべきでない場合
- パーティション分割されていないテーブル。
- 追加の書き込み待機時間が許容されないユース ケース。
- 最適化スケジュールと読み取りパターンが明確に定義された大きなテーブル。
書き込みの最適化機能を有効または無効にする方法
書き込みの最適化機能は既定では無効になっています。 Spark 3.3 プールでは、パーティション テーブルに対して既定で有効になっています。
プールまたはセッションの構成設定が完了すると、すべての Spark 書き込みパターンでこの機能が使用されます。
書き込みの最適化機能を使用するには、次の構成を使用して有効にします。
- Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
- Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = true
現在の構成値を確認するには、次のようにコマンドを使用します。
- Scala と PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.enabled")
- Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled`
書き込みの最適化機能を無効にするには、次の構成を次のように変更します。
- Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "false")
- Spark SQL
SET `spark.microsoft.delta.optimizeWrite.enabled` = false
テーブル プロパティを使用した書き込みの最適化の制御
新しいテーブルの場合
- SQL
CREATE TABLE <table_name> TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
- Scala
DeltaTableBuilder API を使用する場合:
val table = DeltaTable.create()
.tableName("<table_name>")
.addColumn("<colName>", <dataType>)
.location("<table_location>")
.property("delta.autoOptimize.optimizeWrite", "true")
.execute()
既存のテーブルの場合:
- SQL
ALTER TABLE <table_name> SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true)
- Scala
DeltaTableBuilder API を使用する場合:
val table = DeltaTable.replace()
.tableName("<table_name>")
.location("<table_location>")
.property("delta.autoOptimize.optimizeWrite", "true")
.execute()
書き込みの最適化の現在の最大ファイル サイズ構成を取得および変更する方法
現在の構成値を取得するには、次のコマンドを使用します。 既定値は 128 MB です。
- Scala と PySpark
spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")
- SQL
SET `spark.microsoft.delta.optimizeWrite.binSize`
- 構成値を変更する場合
- Scala と PySpark
spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "134217728")
- SQL
SET `spark.microsoft.delta.optimizeWrite.binSize` = 134217728