共用方式為


Azure Databricks 上的結構化串流模式

這包含在 Azure Databricks 上使用結構化串流的常見模式筆記本和程式碼範例。

開始使用結構化串流

如果您不熟悉結構化串流,請參閱執行您的第一個結構化串流工作負載

寫入 Cassandra 作為 Python 中結構化串流的接收器

Apache Cassandra 是分散式、低延遲、可調整、高度可用的 OLTP 資料庫。

結構化串流可透過 Spark Cassandra 連接器與 Cassandra 搭配運作。 此連接器同時支援 RDD 和 DataFrame API,而且原生支援寫入串流資料。 重要 您必須使用對應版本的 spark-cassandra-connector-assembly

下列範例會連線到 Cassandra 資料庫叢集中的一或多個主機。 它也會指定連接組態,例如檢查點位置和特定索引鍵空間,以及 table 名稱:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

在 Python 中使用 foreachBatch() 寫入 Azure Synapse Analytics

streamingDF.writeStream.foreachBatch() 可讓您重複使用現有的批次資料寫入器,將串流查詢的輸出寫入 Azure Synapse Analytics。 如需詳細資料,請參閱 foreachBatch 文件 (英文)。

若要執行此範例,您需要 Azure Synapse Analytics 連接器。 如需 Azure Synapse Analytics 連接器的詳細資料,請參閱在 Azure Synapse Analytics 中查詢資料 (英文)。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Stream-Stream 聯結

這兩個筆記本示範如何在 Python 和 Scala 中使用 stream-stream 聯結。

Stream-Stream 聯結 Python 筆記本

Get 筆記本

Stream-Stream 聯結 Scala 筆記本

Get 筆記本