Kör din första arbetsbelastning för strukturerad direktuppspelning
Den här artikeln innehåller kodexempel och förklaring av grundläggande begrepp som krävs för att köra dina första strukturerade direktuppspelningsfrågor på Azure Databricks. Du kan använda Structured Streaming för arbetsbelastningar för nästan realtidsbearbetning och inkrementell bearbetning.
Structured Streaming är en av flera tekniker som ligger bakom streaming tables i Delta Live Tables. Databricks rekommenderar att du använder Delta Live Tables för alla nya ETL-, inmatnings- och strukturerade strömningsarbetsbelastningar. Se Vad är Delta Live Tables?.
Kommentar
Delta Live-Tables har en något ändrad syntax för att deklarera strömmande tables, men den allmänna syntaxen för att konfigurera läsningar och transformeringar för direktuppspelning gäller för alla användningsfall för strömning i Azure Databricks. Delta Live Tables förenklar också strömning genom att hantera tillståndsinformation, metadata och många konfigurationer.
Använda Automatisk inläsning för att läsa strömmande data från objektlagring
I följande exempel visas hur du läser in JSON-data med Auto Loader, som används cloudFiles
för att ange format och alternativ. Alternativet schemaLocation
möjliggör schema slutsatsdragning och utveckling. Klistra in följande kod i en Databricks Notebook-cell och kör cellen för att skapa en strömmande DataFrame med namnet raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Precis som andra läsåtgärder i Azure Databricks läser konfigurationen av en strömmande läsning inte in data. Du måste utlösa en åtgärd på data innan strömmen börjar.
Kommentar
Att anropa display()
på en strömmande DataFrame startar ett direktuppspelningsjobb. För de flesta användningsfall för strukturerad direktuppspelning bör åtgärden som utlöser en ström skriva data till en mottagare. Se Produktionsöverväganden för strukturerad direktuppspelning.
Utföra en direktuppspelningstransformering
Strukturerad direktuppspelning stöder de flesta transformeringar som är tillgängliga i Azure Databricks och Spark SQL. Du kan till och med läsa in MLflow-modeller som UDF:er och göra förutsägelser för strömning som en transformering.
I följande kodexempel slutförs en enkel transformering för att utöka inmatade JSON-data med ytterligare information med hjälp av Spark SQL-funktioner:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
Resultatet transformed_df
innehåller frågeinstruktioner för att läsa in och transformera varje post när den kommer till datakällan.
Kommentar
Strukturerad direktuppspelning behandlar datakällor som obundna eller oändliga datamängder. Därför stöds inte vissa transformeringar i arbetsbelastningar för strukturerad direktuppspelning eftersom de skulle kräva sortering av ett oändligt antal objekt.
De flesta sammansättningar och många kopplingar kräver hantering av tillståndsinformation med vattenstämplar, fönster och utdataläge. Se Tillämpa vattenstämplar för att kontrollera tröskelvärden för databehandling.
Utföra en inkrementell batchskrivning till Delta Lake
I följande exempel skrivs till Delta Lake med en angiven filsökväg och kontrollpunkt.
Viktigt!
Se alltid till att du anger en unik kontrollpunktsplats för varje strömmande skrivare som du konfigurerar. Kontrollpunkten innehåller den unika identiteten för din dataström och spårar alla poster som bearbetas och tillståndsinformation som är associerad med din strömmande fråga.
Inställningen availableNow
för utlösaren instruerar Structured Streaming att bearbeta alla tidigare obearbetade poster från källdatauppsättningen och sedan stänga av, så att du på ett säkert sätt kan köra följande kod utan att behöva oroa dig för att lämna en ström igång:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
I det här exemplet kommer inga nya poster till vår datakälla, så upprepad körning av den här koden matar inte in nya poster.
Varning
Strukturerad strömningskörning kan förhindra automatisk avslutning från att stänga av beräkningsresurser. För att undvika oväntade kostnader måste du avsluta direktuppspelningsfrågor.
Läsa data från Delta Lake, transformera och skriva till Delta Lake
Delta Lake har omfattande stöd för att arbeta med Structured Streaming som både källa och mottagare. Se Delta table strömningsläsningar och skrivningar.
I följande exempel visas exempelsyntax för inkrementell inläsning av alla nya poster från en Delta-table, join dem med en ögonblicksbild av en annan Delta-tableoch skriva dem till en Delta-table:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
Du måste ha rätt behörigheter konfigurerade för att läsa källan tables och skriva till mål tables och den angivna kontrollpunktsplatsen. Fyll i alla parameters som anges med vinkelparenteser (<>
) med hjälp av relevanta values för dina datakällor och mottagare.
Kommentar
Delta Live Tables ger en helt deklarativ syntax för att skapa Delta Lake-pipelines och hanterar egenskaper som utlösare och kontrollpunkter automatiskt. Se Vad är Delta Live Tables?.
Läsa data från Kafka, transformera och skriva till Kafka
Apache Kafka och andra meddelandebussar ger några av de lägsta svarstiderna som är tillgängliga för stora datamängder. Du kan använda Azure Databricks för att tillämpa transformeringar på data som matas in från Kafka och sedan skriva data tillbaka till Kafka.
Kommentar
Om du skriver data till molnobjektlagring läggs ytterligare svarstid till. Om du vill lagra data från en meddelandebuss i Delta Lake men kräver lägsta möjliga svarstid för strömmande arbetsbelastningar rekommenderar Databricks att du konfigurerar separata direktuppspelningsjobb för att mata in data till lakehouse och tillämpa transformeringar i nära realtid för nedströms meddelandebussmottagare.
Följande kodexempel visar ett enkelt mönster för att berika data från Kafka genom att koppla dem till data i en Delta-table och sedan skriva tillbaka till Kafka:
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
Du måste ha rätt behörigheter konfigurerade för åtkomst till Kafka-tjänsten. Fyll i alla parameters som anges med vinkelparenteser (<>
) med hjälp av relevanta values för dina datakällor och mottagare. Se Stream-bearbetning med Apache Kafka och Azure Databricks.