SQL Server Big Data Clusters Spark Streaming guide
Applies to: SQL Server 2019 (15.x)
Important
The Microsoft SQL Server 2019 Big Data Clusters add-on will be retired. Support for SQL Server 2019 Big Data Clusters will end on February 28, 2025. All existing users of SQL Server 2019 with Software Assurance will be fully supported on the platform and the software will continue to be maintained through SQL Server cumulative updates until that time. For more information, see the announcement blog post and Big data options on the Microsoft SQL Server platform.
This guide covers streaming use cases and how to implement them by using SQL Server Big Data Clusters Spark.
In this guide, you'll learn how to:
- Load streaming libraries to use with PySpark and Scala Spark.
- Implement three common streaming patterns by using SQL Server Big Data Clusters.
Prerequisites
- A SQL Server Big Data Clusters deployment
- One of these options:
- Apache Kafka cluster 2.0 or later
- An Azure Event Hubs namespace and event hub
This guide assumes a good level of understanding about streaming technology concepts and architectures. The following articles provide excellent conceptual baselines:
- Data architecture guide - Real-time processing
- Use Azure Event Hubs from Apache Kafka applications
- Data architecture guide - Choose a real-time message ingestion technology in Azure
Apache Kafka and Azure Event Hubs conceptual mapping
Apache Kafka concept | Event Hubs concept |
---|---|
Cluster | Namespace |
Topic | Event hub |
Partition | Partition |
Consumer group | Consumer group |
Offset | Offset |
Reproducibility
This guide uses the producer application provided in Quickstart: Data streaming with Event Hubs by using the Kafka protocol. You can find sample applications in many programming languages at Azure Event Hubs for Apache Kafka on GitHub. Use these applications to jump-start streaming scenarios.
Note
One of the steps accomplished by the quickstart is that the Kafka streaming option is enabled when creating the Azure Event Hub. Confirm that the Kafka endpoint for the Azure Event Hub namespace is enabled.
The following modified producer.py
code streams simulated sensor JSON data into the streaming engine by using a Kafka-compatible client. Notice that Azure Event Hubs is compatible with the Kafka protocol. Follow the setup instructions in GitHub to make the sample work for you.
All connection information is in the conf
dictionary. Your setup might differ depending on your environment. Replace at least bootstrap.servers
and sasl.password
. These settings are the most relevant in the following code sample.
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, version 2.0.
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka ecosystems.
from confluent_kafka import Producer
import sys
import random
import time
import json
sensors = ["Sensor 1", "Sensor 2", "Sensor 3"]
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.stderr.write('Usage: %s <topic>\n' % sys.argv[0])
sys.exit(1)
topic = sys.argv[1]
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# See https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka#prerequisites for SSL issues
conf = {
'bootstrap.servers': '', #replace!
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/usr/lib/ssl/certs/ca-certificates.crt',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': '<password>', #replace!
'client.id': 'python-sample-producer'
}
# Create Producer instance
p = Producer(**conf)
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %o\n' % (msg.topic(), msg.partition(), msg.offset()))
# Simulate stream
for i in range(0, 10000):
try:
payload = {
'sensor': random.choice(sensors),
'measure1': random.gauss(37, 7),
'measure2': random.random(),
}
p.produce(topic, json.dumps(payload).encode('utf-8'), callback=delivery_callback)
#p.produce(topic, str(i), callback=delivery_callback)
except BufferError as e:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))
p.poll(0)
time.sleep(2)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
Run the sample producer application by using the following command. Replace <my-sample-topic>
with your environment information.
python producer.py <my-sample-topic>
Streaming scenarios
Streaming pattern | Scenario description and implementation |
---|---|
Pull from Kafka or Event Hubs | Create a Spark Streaming job that pulls data continuously from the streaming engine, performing optional transformations and analytics logic. |
Sink streaming data into Apache Hadoop Distributed File System (HDFS) | In general, this pattern correlates with the previous pattern. After the streaming pull and transformation logic, data can be written to many locations to achieve the desired data persistence requirement. |
Push from Spark into Kafka or Event Hubs | After processing by Spark, data can be pushed back into the external streaming engine. This pattern is desirable in many scenarios, such as real-time product recommendations and micro-batch fraud and anomaly detection. |
Sample Spark Streaming application
This sample application implements the three streaming patterns described in the previous section. The application:
- Sets up configuration variables to connect to the streaming service.
- Creates a Spark Streaming data frame to pull data.
- Writes aggregated data locally to HDFS.
- Writes aggregated data to a different topic in the streaming service.
Here's the complete sample-spark-streaming-python.py
code:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Sets up batch size to 15 seconds
duration_ms = 15000
# Changes Spark session into a structured streaming context
sc = SparkContext.getOrCreate()
ssc = StreamingContext(sc, duration_ms)
spark = SparkSession(sc)
# Connection information
bootstrap_servers = "" # Replace!
sasl = "" # Replace!
# Topic we will consume from
topic = "sample-topic"
# Topic we will write to
topic_to = "sample-topic-processed"
# Define the schema to speed up processing
jsonSchema = StructType([StructField("sensor", StringType(), True), StructField("measure1", DoubleType(), True), StructField("measure2", DoubleType(), True)])
streaming_input_df = (
spark.readStream \
.format("kafka") \
.option("subscribe", topic) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.option("kafka.request.timeout.ms", "60000") \
.option("kafka.session.timeout.ms", "30000") \
.option("failOnDataLoss", "true") \
.option("startingOffsets", "latest") \
.load()
)
def foreach_batch_function(df, epoch_id):
# Transform and write batchDF
if df.count() <= 0:
None
else:
# Create a data frame to be written to HDFS
sensor_df = df.selectExpr('CAST(value AS STRING)').select(from_json("value", jsonSchema).alias("value")).select("value.*")
# root
# |-- sensor: string (nullable = true)
# |-- measure1: double (nullable = true)
# |-- measure2: double (nullable = true)
sensor_df.persist()
# Write to HDFS
sensor_df.write.format('parquet').mode('append').saveAsTable('sensor_data')
# Create a summarization data frame
sensor_stats_df = (sensor_df.groupBy('sensor').agg({'measure1':'avg', 'measure2':'avg', 'sensor':'count'}).withColumn('ts', current_timestamp()).withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('avg(measure2)', 'measure2_avg').withColumnRenamed('avg(measure1)', 'measure1_avg').withColumnRenamed('count(sensor)', 'count_sensor'))
# root
# |-- sensor: string (nullable = true)
# |-- measure2_avg: double (nullable = true)
# |-- measure1_avg: double (nullable = true)
# |-- count_sensor: long (nullable = false)
# |-- ts: timestamp (nullable = false)
sensor_stats_df.write.format('parquet').mode('append').saveAsTable('sensor_data_stats')
# Group by and send metrics to an output Kafka topic
sensor_stats_df.writeStream \
.format("kafka") \
.option("topic", topic_to) \
.option("kafka.bootstrap.servers", bootstrap_servers) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", sasl) \
.save()
# For example, you could write to SQL Server
# df.write.format('com.microsoft.sqlserver.jdbc.spark').mode('append').option('url', url).option('dbtable', datapool_table).save()
sensor_df.unpersist()
writer = streaming_input_df.writeStream.foreachBatch(foreach_batch_function).start().awaitTermination()
Create the following tables by using Spark SQL. The PySpark kernel in an Azure Data Studio notebook is one way to run Spark SQL interactively. In a new notebook in Azure Data Studio, connect to the Spark pool of your big data cluster. Choose the PySpark kernel, and execute the following:
%%sql
CREATE TABLE IF NOT EXISTS sensor_data (sensor string, measure1 double, measure2 double)
USING PARQUET;
CREATE TABLE IF NOT EXISTS sensor_data_stats (sensor string, measure2_avg double, measure1_avg double, count_sensor long, ts timestamp)
USING PARQUET;
Copy the application to HDFS
azdata bdc hdfs cp --from-path sample-spark-streaming-python.py --to-path "hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py"
Configure Kafka libraries
Set up your Kafka client libraries with your application before you submit the jobs. Two libraries are required:
- kafka-clients - This core library enables Kafka protocol support and connectivity.
- spark-sql-kafka - This library enables the Spark SQL data frame functionality on Kafka streams.
Both libraries must:
- Target Scala 2.12 and Spark 3.1.2. This SQL Server Big Data Cluster requirement is for Cumulative Update 13 (CU13) or later.
- Be compatible with your Streaming server.
Caution
As a general rule, use the most recent compatible library. The code in this guide was tested by using Apache Kafka for Azure Event Hubs. The code is provided as-is, not as a statement of supportability.
Apache Kafka offers bidirectional client compatibility by design. But library implementations vary across programming languages. Always refer to your Kafka platform documentation to correctly map compatibility.
Share library locations for jobs on HDFS
If multiple applications connect to the same Kafka cluster, or if your organization has a single versioned Kafka cluster, copy the appropriate library JAR files to a shared location on HDFS. Then all jobs should reference the same library files.
Copy the libraries to the common location:
azdata bdc hdfs cp --from-path kafka-clients-2.7.0.jar --to-path "hdfs:/apps/jars/kafka-clients-3.0.0.jar"
azdata bdc hdfs cp --from-path spark-sql-kafka-0-10_2.11-2.4.7.jar --to-path "hdfs:/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"
Dynamically install the libraries
You can dynamically install packages when you submit a job by using the package management features of SQL Server Big Data Clusters. There's a job startup time penalty because of the recurrent downloads of the library files on each job submission.
Submit the Spark Streaming job by using azdata
The following example uses the shared library JAR files on HDFS:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
-j '["/apps/jars/kafka-clients-3.0.0.jar","/apps/jars/spark-sql-kafka-0-10_2.12-3.1.2.jar"]' \
--config '{"spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
This example uses dynamic package management to install the dependencies:
azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/sample-spark-streaming-python.py \
--config '{"spark.jars.packages": "org.apache.kafka:kafka-clients:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2","spark.streaming.concurrentJobs":"3","spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyStreamingETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m
Next steps
To submit Spark jobs to SQL Server Big Data Clusters by using azdata
or Livy endpoints, see Submit Spark jobs by using command-line tools.
For more information about SQL Server Big Data Clusters and related scenarios, see SQL Server Big Data Clusters.