共用方式為


將數據從 Apache Kafka 擷取到 Azure 數據總管

Apache Kafka 是用於建置即時串流資料管線和應用程式的分散式串流平台,可在系統或應用程式之間可靠地移動資料。 Kafka Connect 是一項工具,能夠彈性且可靠地在 Apache Kafka 和其他系統之間串流資料。 Kusto Kafka 接收器可作為 Kafka 的連接器,且不需要使用程式碼。 從 Git 存放庫Confluent 連接器中樞下載接收連接器 jar。

本文說明如何使用 Kafka 擷取資料,運用獨立的 Docker 設定以簡化 Kafka 叢集和 Kafka 連接器叢集設定。

如需詳細資訊,請參閱連接器 Git 儲存庫版本詳細資訊

必要條件

建立 Microsoft Entra 服務主體

Microsoft Entra 服務主體可以透過 Azure 入口網站或以程式設計方式建立,如下列範例所示。

此服務主體是連接器用來在 Kusto 資料表中寫入您的資料的識別。 您會授與此服務主體存取 Kusto 資源的權限。

  1. 透過 Azure CLI 登入您的 Azure 訂用帳戶。 然後在瀏覽器中進行驗證。

    az login
    
  2. 選擇用來託管主體的訂用帳戶。 當您有多個訂用帳戶時,需要此步驟。

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 建立服務主體。 在此範例中,服務主體稱為 my-service-principal

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. 從傳回的 JSON 資料中,複製 appIdpasswordtenant 供日後使用。

    {
      "appId": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "00001111-aaaa-2222-bbbb-3333cccc4444",
      "tenant": "00001111-aaaa-2222-bbbb-3333cccc4444"
    }
    

您已建立您的 Microsoft Entra 應用程式和服務主體。

建立目標資料表

  1. 從您的查詢環境,使用下列命令建立名為 Storms 的資料表:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. 使用下列命令建立已嵌入資料的對應資料表對應 Storms_CSV_Mapping

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. 在資料表上建立擷取批次原則,用於可設定的佇列擷取延遲。

    提示

    擷取批次原則是效能最佳化程式,並包含三個參數。 滿足第一個條件會觸發擷取至資料表。

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. 使用來自建立 Microsoft Entra 服務主體的服務主體,授與使用資料庫的權限。

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

執行實驗室

下列實驗室旨在讓您體驗開始建立資料、設定 Kafka 連接器,以及串流此資料。 然後,您可以查看擷取的資料。

複製 git 存放庫

複製實驗室的 git 存放庫

  1. 在您的電腦上建立本機目錄。

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. 複製存放庫。

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

複製的存放庫的內容

執行下列命令以列出複製的存放庫的內容:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

此搜尋的結果是:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

檢閱複製的存放庫中的檔案

下列各節說明檔案樹狀結構中檔案的重要部分。

adx-sink-config.json

此檔案包含 Kusto 接收屬性檔案,您可以在其中更新特定的組態詳細資料:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

根據您的設定,取代下列屬性的值:aad.auth.authorityaad.auth.appidaad.auth.appkeykusto.tables.topics.mapping、(資料庫名稱)、kusto.ingestion.url、和 kusto.query.url

連接器 - Dockerfile

此檔案具有用來產生連接器執行個體的 Docker 映像的命令。 它包含從 git 存放庫發行目錄下載的連接器。

Storm-events-producer 目錄

此目錄具有 Go 程式,可讀取本機 "StormEvents.csv" 檔案,並將資料發佈至 Kafka 主題。

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

啟動容器

  1. 在終端機中,啟動容器:

    docker-compose up
    

    產生者應用程式會開始將事件傳送至 storm-events 主題。 您應該會看到如下的記錄:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. 若要檢查記錄,請在單獨的終端機中執行下列命令:

    docker-compose logs -f | grep kusto-connect
    

啟動連接器

使用 Kafka Connect REST 呼叫來啟動連接器。

  1. 在不同的終端機中,使用下列命令來啟動接收器工作:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. 若要檢查狀態,請在單獨的終端機中執行下列命令:

    curl http://localhost:8083/connectors/storm/status
    

連接器會啟動佇列擷取程序。

注意

如果您有記錄連接器的問題,請建立問題

受控識別

根據預設,Kafka 連接器會在擷取期間使用應用程式方法來進行驗證。 若要使用受控識別進行驗證:

  1. 將受控識別指派給叢集,並授與記憶體帳戶讀取許可權。 如需詳細資訊,請參閱 使用受控識別驗證擷取數據。

  2. 在您的 adx-sink-config.json 檔案中,將 設定 aad.auth.strategymanaged_identity ,並確定 aad.auth.appid 已設定為受控識別用戶端 (應用程式) 識別碼。

  3. 使用私用 實例元數據服務令牌 ,而不是 Microsoft Entra 服務主體

注意

使用受控識別時, appIdtenant 從呼叫月臺的內容推斷, password 而且不需要。

查詢和檢閱資料

確認資料擷取

  1. 一旦資料抵達 Storms 資料表,請檢查資料列計數來確認資料傳輸:

    Storms 
    | count
    
  2. 確認擷取程序中沒有發生失敗:

    .show ingestion failures
    

    一旦您看到資料,請嘗試一些查詢。

查詢資料

  1. 若要查看所有記錄,請執行下列查詢

    Storms
    | take 10
    
  2. 使用 whereproject 來篩選特定資料:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. 使用 summarize 運算子:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    已連線 Kafka 查詢直條圖結果的螢幕擷取畫面。

如需更多查詢範例和指導,請參閱在 KQL 中寫入查詢Kusto 查詢語言文件

Reset

若要重設,請執行下列步驟:

  1. 停止容器 (docker-compose down -v)
  2. 刪除 (drop table Storms)
  3. 重新建立 Storms 資料表
  4. 重新建立資料表對應
  5. 重新啟動容器 (docker-compose up)

清除資源

若要刪除 Azure 數據總管資源,請使用 az kusto cluster delete (kusto extension)az kusto database delete (kusto extension)

az kusto cluster delete --name "<cluster name>" --resource-group "<resource group name>"
az kusto database delete --cluster-name "<cluster name>" --database-name "<database name>" --resource-group "<resource group name>"

您也可以透過 Azure 入口網站 刪除叢集和資料庫。 如需詳細資訊,請參閱 刪除 Azure 資料總管叢集刪除 Azure 數據總管中的資料庫。

調整 Kafka 接收器連接器

調整 Kafka Sink 連接器以搭配擷取批次原則

  • 調整 Kafka 接收器 flush.size.bytes 大小限制,從 1 MB 開始,以 10 MB 或 100 MB 遞增。
  • 使用 Kafka 接收器時,資料會彙總兩次。 連接器端資料會根據排清設定進行彙總,並根據批次原則在服務端彙總。 如果批次時間太短,因此連接器和服務無法嵌入資料,則必須增加批次時間。 將批次大小設定為 1 GB,並視需要增加或減少 100 MB 的增量。 例如,如果排清大小為 1 MB,且批次原則大小為 100 MB,則 Kafka 接收器連接器會將資料彙總成 100 MB 的批次。 服務接著會擷取該批次。 如果批次原則時間是 20 秒,而 Kafka 接收器連接器會在 20 秒的期間排清 50 MB,則服務會擷取 50 MB 的批次。
  • 您可以透過新增執行個體和 Kafka 分割區來調整。 對分割區數目增加 tasks.max。 如果您有足夠資料可產生 Blob 的 flush.size.bytes 設定大小,請建立分割區。 如果 Blob 較小,則批次會在達到時間限制時處理,因此分割區無法接收足夠的輸送量。 大量的分割區表示有更多的處理額外負荷。