次の方法で共有


構造化ストリーミングの運用に関する考慮事項

この記事には、Azure Databricks のジョブを使用して構造化ストリーミング ワークロードをスケジュールするための推奨事項が含まれています。

Databricks では、常に次の操作を行うことを推奨しています。

  • ノートブックから結果を返す不要なコード(displaycountなど)を Remove で削除します。
  • All-Purpose Compute を使用して構造化ストリーミング ワークロードを実行しないでください。 Jobs Compute を使用して、常にジョブとしてストリームをスケジュールします。
  • Continuous モードを使用してジョブをスケジュールします。
  • 構造化ストリーミング ジョブのコンピューティングに対して自動スケーリングを有効にしないでください。

一部のワークロードには、次の利点があります。

Azure Databricks では、構造化ストリーミング ワークロードの運用インフラストラクチャ管理の複雑さを軽減するために、Delta Live Tables が導入されました。 Databricks では、新しい構造化ストリーミング パイプラインに Delta Live Tables を使用することをお勧めします。 「デルタライブとは何か」Tablesを参照してください。

Note

コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロードの自動スケーリングが強化された Delta Live Tables を使用することをお勧めします。 自動スケールが強化された Delta Live パイプラインのクラスター使用率 を参照してください。

失敗が予想されるストリーミング ワークロードを設計する

Databricks では、失敗時に自動的に再起動するように、ストリーミング ジョブを設定することを推奨しています。 schema 進化を含む一部の機能では、Structured Streaming ワークロードが自動的に再試行するように構成されていることを前提としています。 「障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する」を参照してください。

foreachBatch のような一部の操作では、1 回限りの保証ではなく、少なくとも 1 回の保証が提供されます。 これらの操作では、処理パイプラインが羃等になるようにする必要があります。 foreachBatch を使用した任意のデータ シンクへの書き込みに関するページを参照してください。

Note

クエリが再起動すると、前の実行プロセス中にマイクロバッチが計画されます。 メモリ不足エラーが原因でジョブが失敗した場合、またはマイクロバッチのサイズが大きいためにジョブを手動で取り消した場合は、マイクロバッチを正常に処理するためにコンピューティングのスケールアップが必要になる場合があります。

実行間で構成を変更した場合、これらの構成は計画された最初の新しいバッチに適用されます。 「構造化ストリーミング クエリの変更後に復旧する」を参照してください。

ジョブはいつ再試行されますか?

Azure Databricks ジョブの一部として、複数のタスクをスケジュールできます。 継続的トリガーを使用してジョブを構成する場合、タスク間の依存関係を set することはできません。

次のいずれかの方法を使用して、1 つのジョブで複数のストリームをスケジュールすることができます。

  • 複数のタスク: 継続的トリガーを使用してストリーミング ワークロードを実行する複数のタスクを含むジョブを定義します。
  • 複数のクエリ: 1 つのタスクのソース コードで複数のストリーミング クエリを定義します。

これらの戦略を組み合わせることもできます。 次の table では、これらの方法を比較します。

複数のタスク 複数のクエリ
コンピューティングはどのように共有されますか? Databricks では、各ストリーミング タスクに適したサイズの Jobs Compute をデプロイすることをお勧めします。 必要に応じて、タスク間でコンピューティングを共有できます。 すべてのクエリで同じコンピューティングが共有されます。 オプションで、スケジューラ プールにクエリを割り当てることができます。
再試行はどのように処理されますか? すべてのタスクが失敗しない限り、ジョブは再試行されません。 クエリが失敗すると、タスクは再試行します。

障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する

Databricks では、継続的トリガーを使用して、すべてのストリーミング ワークロードを構成することをお勧めします。 「ジョブを継続的に実行する」を参照してください。

継続的トリガーは、既定で次の動作を提供します。

  • ジョブの複数の同時実行を阻止します。
  • 前の実行が失敗したときに新しい実行を開始します。
  • 再試行にエクスポネンシャル バックオフを使用します。

Databricks では、ワークフローをスケジュールするときに、All-Purpose Compute ではなく Jobs Compute を常に使用することをお勧めします。 ジョブが失敗して再試行すると、新しいコンピューティング リソースがデプロイされます。

Note

streamingQuery.awaitTermination()spark.streams.awaitAnyTermination() を使用する必要はありません。 ストリーミング クエリがアクティブな場合、ジョブによって、実行が自動的に完了を防止します。

複数のストリーミング クエリにスケジューラ プールを使用する

同じソース コードから複数のストリーミング クエリを実行するときに、クエリにコンピューティング容量を割り当てるスケジュール プールを構成できます。

既定では、ノートブックで開始されたクエリはすべて、同じ公平なスケジュール プールで実行されます。 ノートブック内のすべてのストリーミング クエリからトリガーによって生成された Apache Spark ジョブは、“先入れ先出し”(FIFO) 順に順番に実行されます。 これにより、クラスター リソースを効率的に共有しないので、クエリで不要な遅延が発生する可能性があります。

スケジューラ プールを使用すると、コンピューティング リソースを共有する構造化ストリーミング クエリを宣言できます。

次の例では、query1 が専用プールに割り当てられ、query2query3 でスケジューラ プールが共有されます。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Note

ローカルプロパティ構成は、ストリーミングクエリを開始するのと同じ where ノートブックセル内にある必要があります。

詳細については、Apache Fair Scheduler のドキュメントを参照してください。