Azure Databricks 上的結構化串流模式
這包含在 Azure Databricks 上使用結構化串流的常見模式筆記本和程式碼範例。
開始使用結構化串流
如果您不熟悉結構化串流,請參閱執行您的第一個結構化串流工作負載。
寫入 Cassandra 作為 Python 中結構化串流的接收器
Apache Cassandra 是分散式、低延遲、可調整、高度可用的 OLTP 資料庫。
結構化串流可透過 Spark Cassandra 連接器與 Cassandra 搭配運作。 此連接器同時支援 RDD 和 DataFrame API,而且原生支援寫入串流資料。 重要 您必須使用對應版本的 spark-cassandra-connector-assembly。
下列範例會連線到 Cassandra 資料庫叢集中的一或多個主機。 它也會指定連接組態,例如檢查點位置和特定索引鍵空間,以及 table 名稱:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
在 Python 中使用 foreachBatch()
寫入 Azure Synapse Analytics
streamingDF.writeStream.foreachBatch()
可讓您重複使用現有的批次資料寫入器,將串流查詢的輸出寫入 Azure Synapse Analytics。 如需詳細資料,請參閱 foreachBatch 文件 (英文)。
若要執行此範例,您需要 Azure Synapse Analytics 連接器。 如需 Azure Synapse Analytics 連接器的詳細資料,請參閱在 Azure Synapse Analytics 中查詢資料 (英文)。
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Stream-Stream 聯結
這兩個筆記本示範如何在 Python 和 Scala 中使用 stream-stream 聯結。