搭配適用於 Kafka API 的 Azure 事件中樞 使用 Spring Kafka
本教學課程說明如何設定 Java 型 Spring Cloud Stream Binder,以針對 Kafka 使用 Azure 事件中樞 來傳送和接收具有 Azure 事件中樞 的訊息。 如需詳細資訊,請參閱從 Apache Kafka 應用程式使用 Azure 事件中樞
在本教學課程中,我們將包含兩個驗證方法: Microsoft Entra 驗證 和 共用存取簽章 (SAS) 驗證。 [無密碼] 索引標籤會顯示 Microsoft Entra 驗證,而 [連接字串] 索引標籤會顯示 SAS 驗證。
Microsoft Entra 驗證是一種機制,可使用 Microsoft Entra ID 中定義的身分識別,連線到 Kafka Azure 事件中樞。 透過 Microsoft Entra 驗證,您可以在集中的位置管理資料庫使用者的身分識別和其他 Microsoft 服務,從而簡化權限管理。
SAS 驗證會使用 Azure 事件中樞 命名空間的 連接字串,來委派 Kafka 事件中樞的存取權。 如果您選擇使用共用存取簽章作為認證,則必須自行管理 連接字串。
必要條件
Azure 訂用帳戶 - 建立免費帳戶。
Java Development Kit (JDK) 第 8 版或更高版本。
Apache Maven 3.2 版或更高版本。
cURL 或類似的 HTTP 公用程式來測試功能。
Azure Cloud Shell 或 Azure CLI 2.37.0 或更高版本。
Azure 事件中樞。 如果您沒有事件中樞,請使用 Azure 入口網站 建立事件中樞。
Spring Boot 應用程式。 如果您沒有這個應用程式,請使用 Spring Initializr 來建立 Maven 專案。 請務必選取 Maven 專案,然後在 [相依性] 底下新增 Spring Web、適用於 Apache Kafka 的 Spring 和 Cloud Stream 相依性,然後選取 [Java 第 8 版或更高版本]。
重要
需要 Spring Boot 2.5 版或更高版本,才能完成本教學課程中的步驟。
準備認證
Azure 事件中樞支援使用 Microsoft Entra ID 來授權對事件中樞資源的要求。 使用 Microsoft Entra 識別符,您可以使用 Azure 角色型存取控制 (Azure RBAC) 將權限 授與安全性主體,這可能是使用者或應用程式服務主體。
如果您想要使用 Microsoft Entra 驗證在本機執行此範例,請確定您的使用者帳戶已透過適用於 IntelliJ 的 Azure 工具組、Visual Studio Code Azure 帳戶外掛程式或 Azure CLI 進行驗證。 此外,請確定帳戶已獲得足夠的許可權。
注意
使用無密碼連線時,您必須將資源的存取權授與您的帳戶。 在 Azure 事件中樞 中,將 和 Azure Event Hubs Data Sender
角色指派Azure Event Hubs Data Receiver
給您目前使用的 Microsoft Entra 帳戶。 如需授與存取角色的詳細資訊,請參閱使用 Azure 入口網站 指派 Azure 角色,並使用 Microsoft Entra ID 授權事件中樞資源的存取權。
從 Azure 事件中樞 傳送和接收訊息
透過 Azure 事件中樞,您可以使用 Spring Cloud Azure 來傳送和接收訊息。
若要安裝 Spring Cloud Azure Starter 模組,請將下列相依性新增至您的 pom.xml 檔案:
Spring Cloud Azure 材料帳單(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>5.19.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
注意
如果您使用 Spring Boot 2.x,請務必將
spring-cloud-azure-dependencies
版本設定為4.19.0
。 此材料帳單 (BOM) 應該在<dependencyManagement>
pom.xml檔案的 區段中設定。 這可確保所有 Spring Cloud Azure 相依性都使用相同的版本。 如需此 BOM 所用版本的詳細資訊,請參閱 應該使用哪個版本的 Spring Cloud Azure。Spring Cloud Azure 入門成品:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter</artifactId> </dependency>
編碼應用程式
使用下列步驟來設定應用程式,以使用 Azure 事件中樞 產生及取用訊息。
將下列屬性新增至 application.properties 檔案,以設定事件中樞認證。
spring.cloud.stream.kafka.binder.brokers=${AZ_EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 spring.cloud.function.definition=consume;supply spring.cloud.stream.bindings.consume-in-0.destination=${AZ_EVENTHUB_NAME} spring.cloud.stream.bindings.consume-in-0.group=$Default spring.cloud.stream.bindings.supply-out-0.destination=${AZ_EVENTHUB_NAME}
提示
如果您使用 版本
spring-cloud-azure-dependencies:4.3.0
,則應該使用 值com.azure.spring.cloud.autoconfigure.kafka.AzureKafkaSpringCloudStreamConfiguration
來新增 屬性spring.cloud.stream.binders.<kafka-binder-name>.environment.spring.main.sources
。由於
4.4.0
會自動新增這個屬性,因此不需要手動新增它。下表描述群組態中的欄位:
欄位 描述 spring.cloud.stream.kafka.binder.brokers
指定 Azure 事件中樞 端點。 spring.cloud.stream.bindings.consume-in-0.destination
指定輸入目的地事件中樞,本教學課程是您稍早建立的中樞。 spring.cloud.stream.bindings.consume-in-0.group
指定 Azure 事件中樞 的取用者群組,您可以將其設定 $Default
為 ,以使用您在建立 Azure 事件中樞 實例時建立的基本取用者群組。spring.cloud.stream.bindings.supply-out-0.destination
指定輸出目的地事件中樞,本教學課程與輸入目的地相同。 注意
如果您啟用自動主題建立,請務必新增組態專案
spring.cloud.stream.kafka.binder.replicationFactor
,並將值設定為至少 1。 如需詳細資訊,請參閱 Spring Cloud Stream Kafka Binder 參考指南。編輯啟動類別檔案以顯示下列內容。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @SpringBootApplication public class EventHubKafkaBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(EventHubKafkaBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->LOGGER.info("New message received: '{}'", message.getPayload()); } @Override public void run(String... args) { many.emitNext(new GenericMessage<>("Hello World"), Sinks.EmitFailureHandler.FAIL_FAST); } }
提示
在本教學課程中,組態或程式代碼中沒有任何驗證作業。 不過,連線到 Azure 服務需要驗證。 若要完成驗證,您需要使用 Azure Identity。 Spring Cloud Azure 使用
DefaultAzureCredential
,Azure 身分識別連結庫會提供它來協助您取得認證,而不需要變更任何程序代碼。DefaultAzureCredential
支援多種驗證方法,並在執行階段判斷應使用的方法。 這種方法可讓您的應用程式在不同的環境中使用不同的驗證方法(例如本機和生產環境),而不需要實作環境特定的程序代碼。 如需詳細資訊,請參閱 DefaultAzureCredential。若要在本機開發環境中完成驗證,您可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 如需詳細資訊,請參閱 Java 開發環境中的 Azure 驗證。 若要在 Azure 裝載環境中完成驗證,建議您使用使用者指派的受控識別。 如需詳細資訊,請參閱什麼是 Azure 資源受控識別?
啟動應用程式。 如下列範例的訊息將會張貼在應用程式記錄檔中:
Kafka version: 3.0.1 Kafka commitId: 62abe01bee039651 Kafka startTimeMs: 1622616433956 New message received: 'Hello World'
部署至 Azure Spring Apps
現在您已在本機執行 Spring Boot 應用程式,現在可以將其移至生產環境。 Azure Spring Apps 可讓您輕鬆地將 Spring Boot 應用程式部署至 Azure,而不需要變更任何程式代碼。 服務會管理 Spring 應用程式的基礎結構,讓開發人員可以專注於處理程式碼。 Azure Spring 應用程式提供生命週期管理,使用全方位的監視和診斷、組態管理、服務探索、持續整合與持續傳遞的整合、藍綠部署等等。 若要將應用程式部署至 Azure Spring Apps,請參閱 將第一個應用程式部署至 Azure Spring Apps。