次の方法で共有


JavaScript 用Azure Event Hubs クライアント ライブラリ - バージョン 5.12.0

Azure Event Hubsは、1 秒あたり何百万ものイベントを取り込み、複数のコンシューマーにストリーミングできる、拡張性の高いパブリッシュ/サブスクライブ サービスです。 これにより、接続されているデバイスとアプリケーションによって生成される大量のデータを処理および分析できます。 Azure Event Hubsの詳細については、「Event Hubs とは」を参照してください。

Azure Event Hubs クライアント ライブラリを使用すると、Node.js アプリケーションでイベントを送受信できます。

主要リンク:

: バージョン 2.1.0 以降を使用していて、このパッケージの最新バージョンに移行する場合は、EventHubs V2 から EventHubs V5 に移行するための移行ガイドを参照してください

v2 とドキュメントのサンプルについては、こちらを参照してください。

v2.1.0 | のソース コードv2.1.0 用パッケージ (npm) | v2.1.0 のサンプル

作業の開始

パッケージをインストールする

npm を使用してAzure Event Hubs クライアント ライブラリをインストールする

npm install @azure/event-hubs

現在サポートされている環境

詳細については、Microsoft のサポート ポリシーを参照してください。

前提条件

TypeScript の構成

TypeScript ユーザーには、ノードの種類の定義がインストールされている必要があります。

npm install @types/node

また、tsconfig.jsonで を有効にする compilerOptions.allowSyntheticDefaultImports 必要があります。 を有効compilerOptions.esModuleInteropallowSyntheticDefaultImportsにした場合、既定では が有効になっていることに注意してください。 詳細については、「 TypeScript のコンパイラ オプション ハンドブック 」を参照してください。

JavaScript バンドル

ブラウザーでこのクライアント ライブラリを使用するには、まず bundler を使用する必要があります。 これを行う方法の詳細については、 バンドルに関するドキュメントを参照してください。

このライブラリでは、記述されている内容に加えて、次の NodeJS コア組み込みモジュール用の追加のポリフィルも必要です。これは、ブラウザーで正常に動作します。

  • buffer
  • os
  • path
  • process

Webpack を使用したバンドル

Webpack v5 を使用している場合は、次の開発依存関係をインストールできます

  • npm install --save-dev os-browserify path-browserify

次に、以下を webpack.config.js に追加します

 const path = require("path");
+const webpack = require("webpack");

 module.exports = {
   entry: "./src/index.ts",
@@ -12,8 +13,21 @@ module.exports = {
       },
     ],
   },
+  plugins: [
+    new webpack.ProvidePlugin({
+      process: "process/browser",
+    }),
+    new webpack.ProvidePlugin({
+      Buffer: ["buffer", "Buffer"],
+    }),
+  ],
   resolve: {
     extensions: [".ts", ".js"],
+    fallback: {
+      buffer: require.resolve("buffer/"),
+      os: require.resolve("os-browserify"),
+      path: require.resolve("path-browserify"),
+    },
   },

ロールアップを使用したバンドル

ロールアップ バンドルを使用している場合は、次の開発依存関係をインストールします

  • npm install --save-dev @rollup/plugin-commonjs @rollup/plugin-inject @rollup/plugin-node-resolve

次に、次の内容を rollup.config.js に含めます。

+import nodeResolve from "@rollup/plugin-node-resolve";
+import cjs from "@rollup/plugin-commonjs";
+import shim from "rollup-plugin-shim";
+import inject from "@rollup/plugin-inject";

export default {
  // other configs
  plugins: [
+    shim({
+      fs: `export default {}`,
+      net: `export default {}`,
+      tls: `export default {}`,
+      path: `export default {}`,
+      dns: `export function resolve() { }`,
+    }),
+    nodeResolve({
+      mainFields: ["module", "browser"],
+      preferBuiltins: false,
+    }),
+    cjs(),
+    inject({
+      modules: {
+        Buffer: ["buffer", "Buffer"],
+        process: "process",
+      },
+      exclude: ["./**/package.json"],
+    }),
  ]
};

ポリフィルの使用の詳細については、お気に入りの bundler のドキュメントを参照してください。

React Native サポート

ブラウザーと同様に、React Nativeはこの SDK ライブラリで使用される一部の JavaScript API をサポートしていないため、ポリフィルを提供する必要があります。 詳細については、「Messaging React Native sample with Expo」を参照してください。

クライアントを認証する

Event Hubs との対話は、 EventHubConsumerClient クラスのインスタンスまたは EventHubProducerClient クラスのインスタンスから始まります。 次に示すように、これらのクラスをインスタンス化するさまざまな方法をサポートするコンストラクター オーバーロードがあります。

Event Hubs 名前空間に接続文字列を使用する

コンストラクター のオーバーロードの 1 つは、フォームEndpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;とエンティティ名の接続文字列を Event Hub インスタンスに取り込みます。 コンシューマー グループを作成し、接続文字列とエンティティ名をAzure portalから取得できます。

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string", "my-event-hub");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string",
  "my-event-hub"
);

イベント ハブのポリシーに接続文字列を使用する

別のコンストラクター オーバーロードは、(Event Hubs 名前空間ではなく) Event Hub インスタンスで直接定義した共有アクセス ポリシーに対応する接続文字列を受け取ります。 この接続文字列は、 という形式Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-event-hub-nameになります。 前のコンストラクター オーバーロードと接続文字列形式の主な違いは、 ;EntityPath=my-event-hub-nameです。

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const producerClient = new EventHubProducerClient("my-connection-string-with-entity-path");
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-connection-string-with-entity-path"
);

Event Hubs 名前空間と Azure Identity を使用する

このコンストラクター オーバーロードは、Event Hub インスタンスのホスト名とエンティティ名と、TokenCredential インターフェイスを実装する資格情報を受け取ります。 これにより、Azure Active Directory プリンシパルを使用して認証できます。 @azure/ID パッケージで使用できるインターフェイスの実装TokenCredentialがあります。 ホスト名の形式 <yournamespace>.servicebus.windows.netは です。 Azure Active Directory を使用する場合は、Azure Event Hubs データ所有者ロールなどの Event Hubs へのアクセスを許可するロールがプリンシパルに割り当てられている必要があります。 Event Hubs で Azure Active Directory 承認を使用する方法の詳細については、 関連するドキュメントを参照してください

const { EventHubProducerClient, EventHubConsumerClient } = require("@azure/event-hubs");

const { DefaultAzureCredential } = require("@azure/identity");
const credential = new DefaultAzureCredential();
const producerClient = new EventHubProducerClient("my-host-name", "my-event-hub", credential);
const consumerClient = new EventHubConsumerClient(
  "my-consumer-group",
  "my-host-name",
  "my-event-hub",
  credential
);

主要な概念

  • Event Hub プロデューサーは、組み込みデバイス ソリューション、モバイル デバイス アプリケーション、コンソールまたはその他のデバイスで実行されているゲーム タイトル、一部のクライアントまたはサーバー ベースのビジネス ソリューション、または Web サイトの一部として、テレメトリ データ、診断情報、使用状況ログ、またはその他のログ データのソースです。

  • Event Hub コンシューマーは、イベント ハブからそのような情報を取得して処理します。 処理には、集計、複雑な計算、フィルター処理が含まれる場合があります。 生データまたは変換された形式で情報を配布または保存する処理が含まれる場合もあります。 Event Hub コンシューマーは、多くの場合、Azure Stream Analytics、Apache Spark、Apache Storm などの組み込みの分析機能を備えた、堅牢で大規模なプラットフォーム インフラストラクチャ パーツです。

  • パーティションは、Event Hub に保持される順序付けされた一連のイベントです。 パーティションは、イベント コンシューマーに必要な並列処理に関連付けられたデータ編成の手段です。 Azure Event Hubs では、パーティション分割されたコンシューマー パターンを介してメッセージ ストリーミングを提供し、各コンシューマーがメッセージ ストリームの特定のサブセット (パーティション) のみを読み取ります。 新しいイベントが到着すると、このシーケンスの末尾に追加されます。 パーティションの数は、Event Hub の作成時に指定され、変更することはできません。

  • コンシューマー グループはは、Event Hub 全体のビューです。 コンシューマー グループを使用すると、複数のコンシューマー アプリケーションが個別のイベント ストリーム ビューを持つことができるようになり、それぞれの場所から独自のペースでストリームを個別に読み取ることができます。 コンシューマー グループあたり最大 5 つのリーダーをパーティションに同時に設定できますが、特定のパーティションとコンシューマー グループの組み合わせには、アクティブな 1 つのコンシューマーのみをお勧めします。 アクティブな各リーダーは、そのパーティションからすべてのイベントを受信します。同じパーティションに複数のリーダーがある場合は、重複するイベントを受信します。

詳細な概念と詳細な説明については、「Event Hubs の機能」を参照してください。

再試行に関するガイダンス

と はEventHubConsumerClient、SDK が一時的なエラーを処理する方法を調整できる を設定retryOptionsできる場所を受け入れますoptionsEventHubProducerClient 一時的なエラーの例としては、一時的なネットワークまたはサービスの問題があります。

イベントを使用するときの再試行

SDK がイベントを受信しているときに一時的なエラー (一時的なネットワークの問題など) が発生した場合は、 に渡された再試行オプションに基づいてイベントの受信が EventHubConsumerClient再試行されます。 最大再試行回数が不足している場合は、関数が processError 呼び出されます。

再試行設定を使用して、ネットワーク接続の問題などの一時的な問題に関する通知を迅速に行う方法を制御できます。 たとえば、ネットワークの問題が発生したタイミングを知る必要がある場合は、 と retryDelayInMsの値maxRetriesを小さくできます。

関数の実行後、エラーが processError 再試行可能なイベントである限り、クライアントはパーティションからイベントを受け取り続けます。 それ以外の場合、クライアントはユーザー指定 processClose の関数を呼び出します。 この関数は、サブスクリプションを停止したとき、または負荷分散の一環としてアプリケーションの別のインスタンスによって取得されたために、クライアントが現在のパーティションからのイベントの読み取りを停止したときにも呼び出されます。

関数は processClose 、必要に応じてチェックポイントを更新する機会を提供します。 を processClose実行すると、クライアント (または負荷分散の場合は、アプリケーションの別のインスタンスのクライアント) がユーザー指定 processInitialize の関数を呼び出して、同じパーティションの最後に更新されたチェックポイントからイベントの読み取りを再開します。

イベントの読み取りを停止する場合は、 メソッドによって返された に対して をsubscribesubscription呼び出close()す必要があります。

次のセクションでは、Azure Event Hubsを使用した一般的なタスクの一部を説明するコード スニペットを示します。

Event Hub を検査する

多くの Event Hub 操作は、特定のパーティションのスコープ内で実行されます。 パーティションは Event Hub によって所有されているため、その名前は作成時に割り当てられます。 使用できるパーティションを理解するには、使用可能 EventHubProducerClient な 2 つのクライアントのいずれかを使用して Event Hub に対してクエリを実行します。 EventHubConsumerClient

次の例では、 を使用しています EventHubProducerClient

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubProducerClient("connectionString", "eventHubName");

  const partitionIds = await client.getPartitionIds();

  await client.close();
}

main();

イベントを Event Hub に発行する

イベントを発行するには、EventHubProducerClient を作成する必要があります。 次の例はクライアントを作成する 1 つの方法を示していますが、クライアントをインスタンス化する他の方法については、「 クライアントの認証 」セクションを参照してください。

イベントを特定のパーティションに発行することも、Event Hubs サービスが発行先のパーティション イベントを決定できるようにすることもできます。 イベントの発行を高可用性にする必要がある場合、またはイベント データをパーティション間で均等に分散する必要がある場合は、自動ルーティングを使用することをお勧めします。 次の例では、自動ルーティングを利用します。

  • createBatchEventDataBatch使用してオブジェクトをCreateする
  • tryAdd メソッドを使用して、バッチにイベントを追加します。 これは、バッチ サイズの上限に達するまで、または気に入ったイベントの数の追加が完了するまで、どちらか早い方に行うことができます。 このメソッドは、 に戻り false 、最大バッチ サイズに達したためにバッチにイベントを追加できないことを示します。
  • sendBatch メソッドを使用して、イベントのバッチを送信します。

次の例では、10 個のイベントをAzure Event Hubsに送信しようとしています。

const { EventHubProducerClient } = require("@azure/event-hubs");

async function main() {
  const producerClient = new EventHubProducerClient("connectionString", "eventHubName");

  const eventDataBatch = await producerClient.createBatch();
  let numberOfEventsToSend = 10;

  while (numberOfEventsToSend > 0) {
    let wasAdded = eventDataBatch.tryAdd({ body: "my-event-body" });
    if (!wasAdded) {
      break;
    }
    numberOfEventsToSend--;
  }

  await producerClient.sendBatch(eventDataBatch);
  await producerClient.close();
}

main();

イベントをAzure Event Hubsに送信するプロセスを制御するために、さまざまな段階で渡すことができるオプションがあります。

  • コンストラクターは EventHubProducerClient 、再試行回数などのオプションを指定するために使用できる、型 EventHubClientOptions の省略可能なパラメーターを受け取ります。
  • メソッドは createBatch 、 型 CreateBatchOptions の省略可能なパラメーターを受け取ります。このパラメーターを使用して、作成されるバッチでサポートされる最大バッチ サイズを指定できます。
  • メソッドはsendBatch、現在の操作を取り消すために指定abortSignalできる型SendBatchOptionsの省略可能なパラメーターを受け取ります。
  • 特定のパーティションに送信する場合は、 メソッドの sendBatch オーバーロードを使用して、イベントを送信するパーティションの ID を渡すことができます。 上記の イベント ハブの検査 の例は、使用可能なパーティション ID をフェッチする方法を示しています。

: Azure Stream Analytics を使用する場合、送信されるイベントの本文も JSON オブジェクトである必要があります。 例: body: { "message": "Hello World" }

イベント ハブからイベントを使用する

Event Hub インスタンスからイベントを使用するには、ターゲットとするコンシューマー グループも把握する必要があります。 これがわかったら、 EventHubConsumerClient を作成する準備ができました。 次の例はクライアントを作成する 1 つの方法を示していますが、クライアントをインスタンス化する他の方法については、「 クライアントの認証 」セクションを参照してください。

subscribeクライアントの メソッドには、コンストラクターと組み合わせたオーバーロードがあり、イベントを使用するいくつかの方法に対応できます。

メソッドは subscribe 、maxBatchSize (待機するイベントの数) や maxWaitTimeInSeconds (maxBatchSize イベントの到着を待機する時間) などのオプションを指定するために使用できる型 SubscriptionOptions の省略可能なパラメーターを受け取ります。

1 つのプロセスでイベントを使用する

まず、 の EventHubConsumerClientインスタンスを作成し、そのインスタンスで メソッドを subscribe() 呼び出してイベントの使用を開始します。

メソッドはsubscribe、Azure Event Hubsから受信したイベントを処理するためのコールバックを受け取ります。 イベントの受信を停止するには、 メソッドによって返された オブジェクトに対して をsubscribe()呼び出close()します。

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

複数のプロセス間で負荷分散されたイベントを使用する

Azure Event Hubsは、1 秒あたり何百万ものイベントを処理できます。 処理アプリケーションをスケーリングするには、アプリケーションの複数のインスタンスを実行し、負荷のバランスを取ることができます。

まず、 を受け取るCheckpointStoreコンストラクター オーバーロードのいずれかを使用して のEventHubConsumerClientインスタンスを作成し、 メソッドをsubscribe()呼び出してイベントの使用を開始します。 チェックポイント ストアを使用すると、コンシューマー グループ内のサブスクライバーは、アプリケーションの複数のインスタンス間で処理を調整できます。

この例では、Azure Blob StorageをBlobCheckpointStore@azure/eventhubs-checkpointstore-blob使用して永続ストアに対して必要な読み取り/書き込みを実装する パッケージの を使用します。

メソッドはsubscribe、Azure Event Hubsから受信したイベントを処理するためのコールバックを受け取ります。 イベントの受信を停止するには、 メソッドによって返された オブジェクトに対して をsubscribe()呼び出close()します。

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { ContainerClient } = require("@azure/storage-blob");
const { BlobCheckpointStore } = require("@azure/eventhubs-checkpointstore-blob");

const storageAccountConnectionString = "storage-account-connection-string";
const containerName = "container-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const blobContainerClient = new ContainerClient(storageAccountConnectionString, containerName);

  if (!(await blobContainerClient.exists())) {
    await blobContainerClient.create();
  }

  const checkpointStore = new BlobCheckpointStore(blobContainerClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

詳細については、「 アプリケーションの複数のインスタンス間でパーティションの負荷を分散 する」を参照してください。

1 つのパーティションからイベントを使用する

まず、 の EventHubConsumerClientインスタンスを作成し、そのインスタンスで メソッドを subscribe() 呼び出してイベントの使用を開始します。 ターゲットにするパーティションの ID を、 メソッドに渡して、 subscribe() そのパーティションからのみ使用します。

次の例では、最初のパーティションを使用しています。

メソッドはsubscribe、Azure Event Hubsから受信したイベントを処理するためのコールバックを受け取ります。 イベントの受信を停止するには、 メソッドによって返された オブジェクトに対して をsubscribe()呼び出close()します。

const { EventHubConsumerClient, earliestEventPosition } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "connectionString",
    "eventHubName"
  );
  const partitionIds = await client.getPartitionIds();

  // In this sample, we use the position of earliest available event to start from
  // Other common options to configure would be `maxBatchSize` and `maxWaitTimeInSeconds`
  const subscriptionOptions = {
    startPosition: earliestEventPosition
  };

  const subscription = client.subscribe(
    partitionIds[0],
    {
      processEvents: async (events, context) => {
        // event processing code goes here
      },
      processError: async (err, context) => {
        // error reporting/handling code here
      }
    },
    subscriptionOptions
  );

  // Wait for a few seconds to receive events before closing
  setTimeout(async () => {
    await subscription.close();
    await client.close();
    console.log(`Exiting sample`);
  }, 3 * 1000);
}

main();

EventHubConsumerClient を使用して IotHub を操作する

を使用 EventHubConsumerClient して IotHub を操作することもできます。 これは、リンクされた EventHub から IotHub のテレメトリ データを受信する場合に便利です。 関連付けられている接続文字列には送信要求がないため、イベントを送信することはできません。

  • 接続文字列は、Event Hub と互換性のあるエンドポイント用である必要があることに注意してください (例: "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name")
const { EventHubConsumerClient } = require("@azure/event-hubs");

async function main() {
  const client = new EventHubConsumerClient(
    "my-consumer-group",
    "Endpoint=sb://my-iothub-namespace-[uid].servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key;EntityPath=my-iot-hub-name"
  );
  await client.getEventHubProperties();
  // retrieve partitionIds from client.getEventHubProperties() or client.getPartitionIds()
  const partitionId = "0";
  await client.getPartitionProperties(partitionId);

  await client.close();
}

main();

トラブルシューティング

AMQP 依存関係

Event Hubs ライブラリは、AMQP プロトコルを介した接続の管理、イベントの送受信を行う rhea-promise ライブラリに依存します。

ログ記録

環境変数を設定して、 AZURE_LOG_LEVEL ログを stderrに設定できます。

export AZURE_LOG_LEVEL=verbose

ログを有効にする方法の詳細については、@azure/logger パッケージに関するドキュメントを参照してください。

または、このライブラリを使用するときにログを DEBUG 取得するように環境変数を設定することもできます。 これは、依存関係rhea-promiserheaからもログを出力する場合にも役立ちます。

メモ: AZURE_LOG_LEVEL設定されている場合は、DEBUG よりも優先されます。 AZURE_LOG_LEVELを指定 azure する場合や setLogLevel を呼び出す場合は、DEBUG を使用してライブラリを指定しないでください。

  • Event Hubs SDK から情報レベルのデバッグ ログのみを取得します。
export DEBUG=azure:*:info
  • Event Hubs SDK とプロトコル レベル ライブラリからデバッグ ログを取得する。
export DEBUG=azure*,rhea*
  • (大量のコンソール/ディスク領域を消費する) 生イベント データを表示することに関心がない 場合は、環境変数を DEBUG 次のように設定できます。
export DEBUG=azure*,rhea*,-rhea:raw,-rhea:message
  • エラーと SDK の警告にのみ関心がある場合は、環境変数をDEBUG次のように設定できます。
export DEBUG=azure:*:(error|warning),rhea-promise:error,rhea:events,rhea:frames,rhea:io,rhea:flow

次のステップ

その他のサンプル コード

このライブラリを使用して Event Hubs との間でイベントを送受信する方法の詳細な例については、サンプル ディレクトリを参照してください。

共同作成

このライブラリに投稿する場合、コードをビルドしてテストする方法の詳細については、投稿ガイドを参照してください。

インプレッション数