Partager via


Azure Event Hubs bibliothèque de magasins de points de contrôle pour Javascript à l’aide d’objets blob de stockage

Une solution basée sur le stockage Blob Azure pour stocker les points de contrôle et faciliter l’équilibrage de charge lors de l’utilisation EventHubConsumerClient à partir de la bibliothèque @azure/event-hubs

| Code sourcePackage (npm) | Documentation de référence sur l’API | Échantillons

Prise en main

Installer le package

Installer la bibliothèque d’objets blob du magasin de points de contrôle Azure Event Hubs à l’aide de npm

npm install @azure/eventhubs-checkpointstore-blob

Conditions préalables : vous devez disposer d’un abonnement Azure, d’un espace de noms Event Hubs pour utiliser ce package et d’un compte de stockage

Si vous utilisez ce package dans une application Node.js, utilisez Node.js 8.x ou version ultérieure.

Configurer Typescript

Les utilisateurs TypeScript doivent avoir des définitions de type node installées :

npm install @types/node

Vous devez également activer compilerOptions.allowSyntheticDefaultImports dans votre fichier tsconfig.json. Notez que si vous avez activé compilerOptions.esModuleInterop, allowSyntheticDefaultImports est activé par défaut. Pour plus d’informations, consultez le manuel des options du compilateur de TypeScript .

Concepts clés

  • Mise à l’échelle : créez plusieurs consommateurs, chacun d’entre eux assurant la lecture à partir de quelques partitions Event Hubs.

  • Équilibrage de charge : Les applications qui prennent en charge l’équilibrage de charge se composent d’une ou plusieurs EventHubConsumerClient instances qui ont été configurées pour consommer des événements à partir du même Event Hub et du même groupe de consommateurs et du même CheckpointStore. Ils équilibrent la charge de travail entre différentes instances en répartissant les partitions à traiter entre elles.

  • Point de contrôle : Il s’agit d’un processus par lequel les lecteurs marquent ou valident leur position dans une séquence d’événements de partition. La réalisation des points de contrôle est la responsabilité du consommateur et se produit sur une base par partition dans un groupe de consommateurs. Cette responsabilité signifie que pour chaque groupe de consommateurs, chaque lecteur de partition doit conserver une trace de sa position actuelle dans le flux d’événements. Il peut informer le service lorsqu’il considère que le flux de données est complet.

    Si un lecteur se déconnecte d'une partition, lorsqu'il se reconnecte il commence la lecture au point de contrôle qui a été précédemment soumis par le dernier lecteur de cette partition dans ce groupe de consommateurs. Lorsque le lecteur se connecte, il transmet le décalage à l’Event Hub pour spécifier l’emplacement où commencer la lecture. De cette façon, vous pouvez utiliser les points de contrôle pour marquer les événements comme « terminés » par les applications en aval et pour assurer la résilience si un basculement se produit entre des lecteurs en cours d’exécution sur des ordinateurs différents. Il est possible de revenir à des données plus anciennes en spécifiant un décalage inférieur à partir de ce processus de vérification. Grâce à ce mécanisme, les points de contrôle permettent une résilience au basculement renforcée, mais également la relecture du flux d’événements.

    Un BlobCheckpointStore est une classe qui implémente les méthodes clés requises par EventHubConsumerClient pour équilibrer la charge et mettre à jour les points de contrôle.

Exemples

Créer un à l’aide d’un CheckpointStore Stockage Blob Azure

Utilisez l’extrait de code ci-dessous pour créer un CheckpointStore. Vous devez fournir les chaîne de connexion à votre compte de stockage.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Événements de point de contrôle à l’aide du stockage Blob Azure

Pour contrôler les événements reçus à l’aide de Stockage Blob Azure, vous devez passer un objet compatible avec l’interface SubscriptionEventHandlers avec du code pour appeler la updateCheckpoint() méthode .

Dans cet exemple, SubscriptionHandlers implémente SubscriptionEventHandlers et gère également les points de contrôle.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Dépannage

Activer les journaux d’activité

Vous pouvez définir la AZURE_LOG_LEVEL variable d’environnement sur l’une des valeurs suivantes pour activer la journalisation dans stderr:

  • verbose
  • info
  • warning
  • error

Vous pouvez également définir le niveau de journalisation par programme en important le package @azure/enregistreur d’événements et en appelant la fonction avec l’une setLogLevel des valeurs de niveau de journalisation.

Lors de la définition d’un niveau de journalisation par programme ou via la AZURE_LOG_LEVEL variable d’environnement, tous les journaux écrits à l’aide d’un niveau de journalisation égal ou inférieur à celui que vous choisissez sont émis. Par exemple, lorsque vous définissez le niveau infode journalisation sur , les journaux qui sont écrits pour les niveaux warning et error qui sont également émis. Ce Kit de développement logiciel (SDK) suit les instructions du Kit de développement logiciel (SDK) Azure pour TypeScript lors de la détermination du niveau auquel se connecter.

Vous pouvez également définir la variable d’environnement DEBUG pour obtenir les journaux lors de l’utilisation de cette bibliothèque. Cela peut être utile si vous souhaitez également émettre des journaux à partir des dépendancesrhea-promise.rhea

Note: AZURE_LOG_LEVEL, s’il est défini, est prioritaire sur DEBUG. Ne spécifiez aucune azure bibliothèque via DEBUG lorsque vous spécifiez également AZURE_LOG_LEVEL ou appelez setLogLevel.

Vous pouvez définir la variable d’environnement suivante pour afficher les journaux de débogage quand vous utilisez cette bibliothèque.

  • Obtention uniquement des journaux de débogage au niveau des informations à partir de l’objet blob Eventhubs Checkpointstore.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Journalisation dans un fichier

  • Activez la journalisation comme indiqué ci-dessus, puis exécutez votre script de test comme suit :

    • Les instructions de journalisation de votre script de test vont à out.log et les instructions de journalisation du kit de développement logiciel (SDK) vont à debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Les instructions de journalisation de votre script de test et du sdk vont au même fichier out.log en redirigeant stderr vers stdout (&1), puis en redirigeant stdout vers un fichier :

      node your-test-script.js >out.log 2>&1
      
    • Les instructions de journalisation de votre script de test et du Kit de développement logiciel (SDK) vont dans le même fichier out.log.

      node your-test-script.js &> out.log
      

Étapes suivantes

Consultez le répertoire d’exemples pour obtenir un exemple détaillé.

Contribution

Si vous souhaitez contribuer à cette bibliothèque, lisez le guide de contribution pour en savoir plus sur la génération et le test du code.

Impressions