Condividi tramite


Criteri di Structured Streaming su Azure Databricks

Contiene notebook ed esempi di codice per criteri comuni per l'uso di Structured Streaming in Azure Databricks.

Introduzione a Structured Streaming

Se non si ha esperienza con Structured Streaming, vedere Eseguire il primo carico di lavoro Structured Streaming.

Scrivere in Cassandra come sink per Structured Streaming in Python

Apache Cassandra è un database OLTP distribuito, a bassa latenza, scalabile e a disponibilità elevata.

Structured Streaming funziona con Cassandra tramite il connettore Spark Cassandra. Questo connettore supporta le API RDD e DataFrame e offre il supporto nativo per la scrittura di dati in streaming. Importante È necessario usare la versione corrispondente di spark-cassandra-connector-assembly.

L'esempio seguente si connette a uno o più host in un cluster di database Cassandra. Specifica anche le configurazioni di connessione, ad esempio la posizione del checkpoint e il keyspace specifico e i nomi di table:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Scrivere in Azure Synapse Analytics usando foreachBatch() in Python

streamingDF.writeStream.foreachBatch() consente di riutilizzare i writer di dati batch esistenti per scrivere l'output di una query di streaming in Azure Synapse Analytics. Per altri dettagli, vedere la documentazione foreachBatch.

Per eseguire questo esempio, è necessario il connettore Azure Synapse Analytics. Per informazioni dettagliate sul connettore Azure Synapse Analytics, vedere Eseguire query sui dati in Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Join stream-stream

Questi due notebook illustrano come usare i join stream-stream in Python e Scala.

Notebook Python join stream-stream

Get portatile

Notebook Scala join stream-stream

Get portatile