Use delta tables with streaming data

Completed

All of the data we've explored up to this point has been static data in files. However, many data analytics scenarios involve streaming data that must be processed in near real time. For example, you might need to capture readings emitted by internet-of-things (IoT) devices and store them in a table as they occur. Spark processes batch data and streaming data in the same way, enabling streaming data to be processed in real-time using the same API.

Spark Structured Streaming

A typical stream processing solution involves constantly reading a stream of data from a source, optionally processing it to select specific fields, aggregate and group values, or otherwise manipulate the data, and writing the results to a sink.

Spark includes native support for streaming data through Spark Structured Streaming, an API that is based on a boundless dataframe in which streaming data is captured for processing. A Spark Structured Streaming dataframe can read data from many different kinds of streaming source, including:

  • Network ports
  • Real time message brokering services such as Azure Event Hubs or Kafka
  • File system locations.

Tip

For more information about Spark Structured Streaming, see Structured Streaming Programming Guide in the Spark documentation.

Streaming with Delta tables

You can use a Delta table as a source or a sink for Spark Structured Streaming. For example, you could capture a stream of real time data from an IoT device and write the stream directly to a Delta table as a sink. You can then query the table to see the latest streamed data. Or you could read a Delta as a streaming source, enabling near real-time reporting as new data is added to the table.

Using a Delta table as a streaming source

In the following PySpark example, a Delta table is created to store details of Internet sales orders:

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

A hypothetical data stream of internet orders is inserted into the orders_in table:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

To verify, you can read and display data from the input table:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

The data is then loaded into a streaming DataFrame from the Delta table:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Note

When using a Delta table as a streaming source, only append operations can be included in the stream. Data modifications will cause an error unless you specify the ignoreChanges or ignoreDeletes option.

You can check that the stream is streaming by using the isStreaming property which should return True:

# Verify that the stream is streaming
stream_df.isStreaming

Transform the data stream

After reading the data from the Delta table into a streaming DataFrame, you can use the Spark Structured Streaming API to process it. For example, you could count the number of orders placed every minute and send the aggregated results to a downstream process for near-real-time visualization.

In this example, any rows with NULL in the Price column are filtered and new columns are added for IsBike and Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Using a Delta table as a streaming sink

The data stream is then written to a Delta table:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Note

The checkpointLocation option is used to write a checkpoint file that tracks the state of the stream processing. This file enables you to recover from failure at the point where stream processing left off.

After the streaming process starts, you can query the Delta Lake table to see what is in the output table. There might be a short delay before you can query the table.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

In the results of this query, order 3005 is excluded because it had NULL in the Price column. And the two columns that were added during the transformation are displayed - IsBike and Total.

OrderID OrderDate Customer Product Quantity Price IsBike Total
3001 2023-09-01 Yang Road Bike Red 1 1200 1 1200
3002 2023-09-01 Carlson Mountain Bike Silver 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Yellow 2 1350 1 2700
3004 2023-09-02 Yang Road Front Wheel 1 115 0 115

When finished, stop the streaming data to avoid unnecessary processing costs using the stop method:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Tip

For more information about using Delta tables for streaming data, see Table streaming reads and writes in the Delta Lake documentation.