Łączenie aplikacji platformy Apache Spark z Azure Event Hubs
Ten samouczek przeprowadzi Cię przez proces łączenia aplikacji Spark z usługą Event Hubs na potrzeby przesyłania strumieniowego w czasie rzeczywistym. Ta integracja umożliwia przesyłanie strumieniowe bez konieczności zmieniania klientów protokołu lub uruchamiania własnych klastrów platformy Kafka lub zookeeper. Ten samouczek wymaga platformy Apache Spark w wersji 2.4 lub nowszej oraz platformy Apache Kafka w wersji 2.0 lub nowszej.
Uwaga
Ten przykład jest dostępny w witrynie GitHub
Ten samouczek zawiera informacje na temat wykonywania następujących czynności:
- Tworzenie przestrzeni nazw usługi Event Hubs
- Klonowanie projektu przykładowego
- Uruchamianie platformy Spark
- Odczyt z usługi Event Hubs dla platformy Kafka
- Zapis do usługi Event Hubs dla platformy Kafka
Wymagania wstępne
Przed rozpoczęciem tego samouczka upewnij się, że masz następujące elementy:
- Subskrypcja platformy Azure. Jeśli jej nie masz, utwórz bezpłatne konto.
- Apache Spark w wersji 2.4
- Apache Kafka w wersji 2.0
- Usługa Git
Uwaga
Adapter Spark-Kafka został zaktualizowany do obsługi platformy Kafka w wersji 2.0 i platformy Spark od wersji 2.4. W poprzednich wersjach platformy Spark adapter obsługiwał platformę Kafka w wersji 0.10 i nowszych, ale bazował na interfejsach API platformy Kafka w wersji 0.10. Ponieważ usługa Event Hubs dla platformy Kafka nie obsługuje platformy Kafka w wersji 0.10, adaptery Spark-Kafka w wersjach platformy Spark wcześniejszych niż 2.4 nie są obsługiwane przez usługę Event Hubs dla ekosystemów platformy Kafka.
Tworzenie przestrzeni nazw usługi Event Hubs
Przestrzeń nazw usługi Event Hubs jest wymagana do wysyłania i odbierania zdarzeń z dowolnej usługi Event Hubs. Zobacz Tworzenie centrum zdarzeń, aby uzyskać instrukcje dotyczące tworzenia przestrzeni nazw i centrum zdarzeń. Pobierz parametry połączenia usługi Event Hubs i w pełni kwalifikowaną nazwę domeny (FQDN) w celu późniejszego użycia. Aby uzyskać instrukcje, zobacz Get an Event Hubs connection string (Pobieranie parametrów połączenia usługi Event Hubs).
Klonowanie projektu przykładowego
Sklonuj repozytorium usługi Azure Event Hubs i przejdź do podfolderu tutorials/spark
:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
Odczyt z usługi Event Hubs dla platformy Kafka
Po dokonaniu kilku zmian w konfiguracji możesz rozpocząć odczyt z usługi Event Hubs dla platformy Kafka. Po zaktualizowaniu zmiennych BOOTSTRAP_SERVERS i EH_SASL za pomocą szczegółowych informacji dotyczących przestrzeni nazw możesz rozpocząć przesyłanie strumieniowe z usługi Event Hubs tak samo jak z platformy Kafka. Kompletny kod przykładowy znajduje się w pliku sparkConsumer.scala w witrynie GitHub.
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
Jeśli wystąpi błąd podobny do następującego błędu, dodaj .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
do spark.readStream
wywołania i spróbuj ponownie.
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
Zapis do usługi Event Hubs dla platformy Kafka
Zapis do usługi Event Hubs może również wyglądać tak samo jak zapis do platformy Kafka. Nie zapomnij zaktualizować konfiguracji, wstawiając dla zmiennych BOOTSTRAP_SERVERS i EH_SASL informacje z przestrzeni nazw usługi Event Hubs. Kompletny kod przykładowy znajduje się w pliku sparkProducer.scala w witrynie GitHub.
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
Następne kroki
Aby dowiedzieć się więcej o usłudze Event Hubs i usłudze Event Hubs dla platformy Kafka, zobacz następujące artykuły:
- Dublowanie brokera platformy Kafka w centrum zdarzeń
- Łączenie platformy Apache Flink z centrum zdarzeń
- Integrowanie platformy Kafka Connect z centrum zdarzeń
- Eksplorowanie przykładów w witrynie GitHub
- Łączenie usługi Akka Streams z centrum zdarzeń
- Przewodnik dla deweloperów platformy Apache Kafka dotyczący Azure Event Hubs