Ansluta ditt Apache Spark-program med Azure Event Hubs
Den här självstudien vägleder dig genom att ansluta ditt Spark-program till Event Hubs för realtidsströmning. Den här integreringen möjliggör strömning utan att du behöver ändra dina protokollklienter eller köra dina egna Kafka- eller Zookeeper-kluster. Den här självstudien kräver Apache Spark v2.4+ och Apache Kafka v2.0+.
Anteckning
Det här exemplet finns på GitHub
I den här guiden får du lära dig att:
- Skapa ett Event Hubs-namnområde
- Klona exempelprojektet
- Köra Spark
- Läsa från Event Hubs för Kafka
- Skriva till Event Hubs för Kafka
Krav
Kontrollera att du har följande innan du börjar den här självstudien:
- En Azure-prenumeration. Om du inte har en skapar du ett kostnadsfritt konto.
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
Anteckning
Spark-Kafka-adaptern har uppdaterats för att stödja Kafka v2.0 från och med Spark v2.4. I tidigare versioner av Spark hade adaptern stöd för Kafka v0.10 och senare men förlitade sig tidigare särskilt på Kafka v0.10-API:er. Eftersom Event Hubs för Kafka inte har stöd för Kafka v0.10 stöds inte Spark-Kafka-adaptrar från versioner av Spark före v2.4 av Event Hubs för Kafka-ekosystem.
Skapa ett Event Hubs-namnområde
En Event Hubs-namnrymd krävs för att skicka och ta emot från Event Hubs-tjänster. Instruktioner för att skapa ett namnområde och en händelsehubb finns i Skapa en händelsehubb . Hämta Event Hubs-anslutningssträngen och fullständigt domännamn (FQDN) för senare användning. Anvisningar finns i avsnittet om att hämta en Event Hubs-anslutningssträng.
Klona exempelprojektet
Klona Azure Event Hubs-lagringsplatsen och navigera till undermappen tutorials/spark
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Läsa från Event Hubs för Kafka
Med några få konfigurationsändringar kan du börja läsa från Event Hubs för Kafka. Uppdatera BOOTSTRAP_SERVERS och EH_SASL med information från din namnrymd så kan du starta strömning med Event Hubs på samma sätt som med Kafka. Den fullständiga exempelkoden finns i filen sparkConsumer.scala på GitHub.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Om du får ett fel som liknar följande fel lägger du till .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
i anropet spark.readStream
och försöker igen.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Skriva till Event Hubs för Kafka
Du kan även skriva till Event Hubs på samma sätt som du skriver till Kafka. Glöm inte att uppdatera din konfiguration genom att ändra BOOTSTRAP_SERVERS och EH_SASL med information från din Event Hubs-namnrymd. Den fullständiga exempelkoden finns i filen sparkProducer.scala på GitHub.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Nästa steg
Mer information om Event Hubs och Event Hubs för Kafka finns i följande artiklar: