Partager via


EventHubConsumerClient Classe

La classe EventHubConsumerClient définit une interface de haut niveau pour la réception d’événements du service Azure Event Hubs.

L’objectif main d’EventHubConsumerClient est de recevoir des événements de toutes les partitions d’un EventHub avec équilibrage de charge et point de contrôle.

Lorsque plusieurs instances EventHubConsumerClient s’exécutent sur le même hub d’événements, le même groupe de consommateurs et le même emplacement de point de contrôle, les partitions sont réparties uniformément entre elles.

Pour activer l’équilibrage de charge et les points de contrôle persistants, checkpoint_store devez être défini lors de la création d’EventHubConsumerClient. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire.

Un EventHubConsumerClient peut également recevoir à partir d’une partition spécifique lorsque vous appelez sa méthode receive() ou receive_batch() et spécifiez le partition_id. L’équilibrage de charge ne fonctionne pas en mode à partition unique. Toutefois, les utilisateurs peuvent toujours enregistrer des points de contrôle si le checkpoint_store est défini.

Héritage
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Constructeur

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Paramètres

fully_qualified_namespace
str
Obligatoire

Nom d’hôte complet pour l’espace de noms Event Hubs. Le format de l’espace de noms est : .servicebus.windows.net.

eventhub_name
str
Obligatoire

Chemin d’accès du hub d’événements spécifique auquel connecter le client.

consumer_group
str
Obligatoire

Recevez des événements du hub d’événements pour ce groupe de consommateurs.

credential
TokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
Obligatoire

Objet d’informations d’identification utilisé pour l’authentification qui implémente une interface particulière pour obtenir des jetons. Il accepte EventHubSharedKeyCredential, ou les objets d’informations d’identification générés par la bibliothèque azure-identity et les objets qui implémentent la méthode *get_token(self, scopes).

logging_enable
bool

Indique si les journaux de trace réseau doivent être générés dans l’enregistreur d’événements. La valeur par défaut est False.

auth_timeout
float

Délai en secondes d’attente pour qu’un jeton soit autorisé par le service. La valeur par défaut est 60 secondes. Si la valeur est 0, aucun délai d’expiration n’est appliqué à partir du client.

user_agent
str

Si elle est spécifiée, elle est ajoutée devant la chaîne de l’agent utilisateur.

retry_total
int

Nombre total de tentatives de restauration d’une opération ayant échoué lorsqu’une erreur se produit. La valeur par défaut est 3. Le contexte de retry_total de réception est spécial : la méthode receive est implémentée par une méthode de réception interne appelée while-loop dans chaque itération. Dans le cas de réception , retry_total spécifie le nombre de nouvelles tentatives après l’erreur déclenchée par la méthode de réception interne dans la boucle while. Si les tentatives de nouvelle tentative sont épuisées, le rappel on_error est appelé (s’il est fourni) avec les informations d’erreur. Le consommateur de partition interne défaillant sera fermé (on_partition_close sera appelé s’il est fourni) et un nouveau consommateur de partition interne sera créé (on_partition_initialize sera appelé s’il est fourni) pour reprendre la réception.

retry_backoff_factor
float

Facteur d’interruption à appliquer entre les tentatives après la deuxième tentative (la plupart des erreurs sont résolues immédiatement par un deuxième essai sans délai). En mode fixe, la stratégie de nouvelle tentative est toujours mise en veille pour {facteur d’interruption}. En mode « exponentiel », la stratégie de nouvelle tentative est mise en veille pendant : {facteur d’interruption} * (2 ** ({nombre de nouvelles tentatives totales} - 1)) secondes. Si la backoff_factor est 0.1, la nouvelle tentative est mise en veille pour [0.0s, 0.2s, 0.4s, ...] entre les nouvelles tentatives. La valeur par défaut est 0,8.

retry_backoff_max
float

Temps d’arrêt maximal. La valeur par défaut est 120 secondes (2 minutes).

retry_mode
str

Comportement de délai entre les tentatives de nouvelle tentative. Les valeurs prises en charge sont « fixes » ou « exponentielles », où la valeur par défaut est « exponentielle ».

idle_timeout
float

Délai d’expiration, en secondes, après lequel ce client ferme la connexion sous-jacente s’il n’y a plus d’activité. Par défaut, la valeur est None, ce qui signifie que le client ne s’arrêtera pas en raison de l’inactivité, sauf si le service l’a initié.

transport_type
TransportType

Type de protocole de transport qui sera utilisé pour communiquer avec le service Event Hubs. La valeur par défaut est TransportType.Amqp , auquel cas le port 5671 est utilisé. Si le port 5671 est indisponible/bloqué dans l’environnement réseau, TransportType.AmqpOverWebsocket peut être utilisé à la place, qui utilise le port 443 pour la communication.

http_proxy
dict[str, str ou int]

Paramètres du proxy HTTP. Il doit s’agir d’un dictionnaire avec les clés suivantes : « proxy_hostname » (valeur str) et « proxy_port » (valeur int). En outre, les clés suivantes peuvent également être présentes : « nom d’utilisateur », « mot de passe ».

checkpoint_store
CheckpointStore ou None

Gestionnaire qui stocke les données d’équilibrage de charge et de point de contrôle de la partition lors de la réception d’événements. Le magasin de points de contrôle sera utilisé dans les deux cas de réception à partir de toutes les partitions ou d’une seule partition. Dans ce dernier cas, l’équilibrage de charge ne s’applique pas. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire et le instance EventHubConsumerClient reçoit des événements sans équilibrage de charge.

load_balancing_interval
float

Lorsque l’équilibrage de charge entre en jeu. Il s’agit de l’intervalle, en secondes, entre deux évaluations d’équilibrage de charge. La valeur par défaut est de 30 secondes.

partition_ownership_expiration_interval
float

Une propriété de partition expire après ce nombre de secondes. Chaque évaluation de l’équilibrage de charge étend automatiquement le délai d’expiration de la propriété. La valeur par défaut est 6 * load_balancing_interval, c’est-à-dire 180 secondes lors de l’utilisation de la load_balancing_interval par défaut de 30 secondes.

load_balancing_strategy
str ou LoadBalancingStrategy

Lorsque l’équilibrage de charge entre en jeu, il utilise cette stratégie pour revendiquer et équilibrer la propriété de la partition. Utilisez « gourmand » ou LoadBalancingStrategy.GREEDY pour la stratégie gourmande, qui, pour chaque évaluation d’équilibrage de charge, récupère autant de partitions non réclamées nécessaires pour équilibrer la charge. Utilisez « balanced » ou LoadBalancingStrategy.BALANCED pour la stratégie équilibrée, qui, pour chaque évaluation d’équilibrage de charge, ne revendique qu’une seule partition qui n’est pas revendiquée par un autre EventHubConsumerClient. Si toutes les partitions d’un EventHub sont revendiquées par un autre EventHubConsumerClient et que ce client a réclamé trop peu de partitions, ce client vole une partition à d’autres clients pour chaque évaluation d’équilibrage de charge, quelle que soit la stratégie d’équilibrage de charge. La stratégie gourmande est utilisée par défaut.

custom_endpoint_address
str ou None

Adresse de point de terminaison personnalisée à utiliser pour établir une connexion au service Event Hubs, ce qui permet aux demandes réseau d’être routées via toutes les passerelles Application Gateway ou autres chemins nécessaires pour l’environnement hôte. La valeur par défaut est None. Le format est « sb://< custom_endpoint_hostname> :<custom_endpoint_port> ». Si le port n’est pas spécifié dans le custom_endpoint_address, le port 443 est utilisé par défaut.

connection_verify
str ou None

Chemin d’accès au fichier de CA_BUNDLE personnalisé du certificat SSL utilisé pour authentifier l’identité du point de terminaison de connexion. La valeur par défaut est None, auquel cas certifi.where() sera utilisé.

uamqp_transport
bool

Indique s’il faut utiliser la bibliothèque uamqp comme transport sous-jacent. La valeur par défaut est False et la bibliothèque PURE PYTHON AMQP sera utilisée comme transport sous-jacent.

socket_timeout
float

Durée en secondes pendant laquelle le socket sous-jacent sur la connexion doit attendre lors de l’envoi et de la réception de données avant d’expirer. La valeur par défaut est 0,2 pour TransportType.Amqp et 1 pour TransportType.AmqpOverWebsocket. Si des erreurs EventHubsConnectionError se produisent en raison d’un délai d’attente d’écriture, une valeur supérieure à la valeur par défaut peut être passée. Il s’agit de scénarios d’utilisation avancés et la valeur par défaut doit généralement être suffisante.

Exemples

Créez une instance de eventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Méthodes

close

Arrêtez la récupération des événements à partir d’Event Hub et fermez la connexion ET les liens AMQP sous-jacents.

from_connection_string

Créez un EventHubConsumerClient à partir d’un chaîne de connexion.

get_eventhub_properties

Obtient les propriétés du hub d’événements.

Les clés dans le dictionnaire retourné sont les suivantes :

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obtenez les ID de partition de l’Event Hub.

get_partition_properties

Obtient les propriétés de la partition spécifiée.

Les clés du dictionnaire de propriétés sont les suivantes :

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs.

receive_batch

Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs.

close

Arrêtez la récupération des événements à partir d’Event Hub et fermez la connexion ET les liens AMQP sous-jacents.

close() -> None

Type de retour

Exemples

Fermez le client.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Créez un EventHubConsumerClient à partir d’un chaîne de connexion.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Paramètres

conn_str
str
Obligatoire

Chaîne de connexion d’un Event Hub.

consumer_group
str
Obligatoire

Recevez des événements du hub d’événements pour ce groupe de consommateurs.

eventhub_name
str

Chemin d’accès du hub d’événements spécifique auquel connecter le client.

logging_enable
bool

Indique si les journaux de trace réseau doivent être générés dans l’enregistreur d’événements. La valeur par défaut est False.

auth_timeout
float

Délai en secondes d’attente pour qu’un jeton soit autorisé par le service. La valeur par défaut est 60 secondes. Si la valeur est 0, aucun délai d’expiration n’est appliqué à partir du client.

user_agent
str

Si elle est spécifiée, elle est ajoutée devant la chaîne de l’agent utilisateur.

retry_total
int

Nombre total de tentatives de restauration d’une opération ayant échoué lorsqu’une erreur se produit. La valeur par défaut est 3. Le contexte de retry_total de réception est spécial : la méthode receive est implémentée par une méthode de réception interne appelée while-loop dans chaque itération. Dans le cas de réception , retry_total spécifie le nombre de nouvelles tentatives après l’erreur déclenchée par la méthode de réception interne dans la boucle while. Si les tentatives de nouvelle tentative sont épuisées, le rappel on_error est appelé (s’il est fourni) avec les informations d’erreur. Le consommateur de partition interne défaillant sera fermé (on_partition_close sera appelé s’il est fourni) et un nouveau consommateur de partition interne sera créé (on_partition_initialize sera appelé s’il est fourni) pour reprendre la réception.

retry_backoff_factor
float

Facteur d’interruption à appliquer entre les tentatives après la deuxième tentative (la plupart des erreurs sont résolues immédiatement par un deuxième essai sans délai). En mode fixe, la stratégie de nouvelle tentative est toujours mise en veille pour {facteur d’interruption}. En mode « exponentiel », la stratégie de nouvelle tentative est mise en veille pendant : {facteur d’interruption} * (2 ** ({nombre de nouvelles tentatives totales} - 1)) secondes. Si la backoff_factor est 0.1, la nouvelle tentative est mise en veille pour [0.0s, 0.2s, 0.4s, ...] entre les nouvelles tentatives. La valeur par défaut est 0,8.

retry_backoff_max
float

Temps d’arrêt maximal. La valeur par défaut est 120 secondes (2 minutes).

retry_mode
str

Comportement de délai entre les tentatives de nouvelle tentative. Les valeurs prises en charge sont « fixes » ou « exponentielles », où la valeur par défaut est « exponentielle ».

idle_timeout
float

Délai d’expiration, en secondes, après lequel ce client ferme la connexion sous-jacente en l’absence d’activité furthur. Par défaut, la valeur est None, ce qui signifie que le client ne s’arrêtera pas en raison de l’inactivité, sauf si le service l’a initié.

transport_type
TransportType

Type de protocole de transport qui sera utilisé pour communiquer avec le service Event Hubs. La valeur par défaut est TransportType.Amqp , auquel cas le port 5671 est utilisé. Si le port 5671 est indisponible/bloqué dans l’environnement réseau, TransportType.AmqpOverWebsocket peut être utilisé à la place, qui utilise le port 443 pour la communication.

http_proxy
dict

Paramètres du proxy HTTP. Il doit s’agir d’un dictionnaire avec les clés suivantes : « proxy_hostname » (valeur str) et « proxy_port » (valeur int). En outre, les clés suivantes peuvent également être présentes : « nom d’utilisateur », « mot de passe ».

checkpoint_store
CheckpointStore ou None

Gestionnaire qui stocke les données d’équilibrage de charge et de point de contrôle de la partition lors de la réception d’événements. Le magasin de points de contrôle sera utilisé dans les deux cas de réception à partir de toutes les partitions ou d’une seule partition. Dans ce dernier cas, l’équilibrage de charge ne s’applique pas. Si aucun magasin de points de contrôle n’est fourni, le point de contrôle est conservé en interne en mémoire et le instance EventHubConsumerClient reçoit des événements sans équilibrage de charge.

load_balancing_interval
float

Lorsque l’équilibrage de charge entre en jeu. Il s’agit de l’intervalle, en secondes, entre deux évaluations d’équilibrage de charge. La valeur par défaut est de 10 secondes.

partition_ownership_expiration_interval
float

Une propriété de partition expire après ce nombre de secondes. Chaque évaluation de l’équilibrage de charge étend automatiquement le délai d’expiration de la propriété. La valeur par défaut est 6 * load_balancing_interval, c’est-à-dire 60 secondes lors de l’utilisation de la load_balancing_interval par défaut de 30 secondes.

load_balancing_strategy
str ou LoadBalancingStrategy

Lorsque l’équilibrage de charge entre en jeu, il utilise cette stratégie pour revendiquer et équilibrer la propriété de la partition. Utilisez « gourmand » ou LoadBalancingStrategy.GREEDY pour la stratégie gourmande, qui, pour chaque évaluation d’équilibrage de charge, récupère autant de partitions non réclamées nécessaires pour équilibrer la charge. Utilisez « balanced » ou LoadBalancingStrategy.BALANCED pour la stratégie équilibrée, qui, pour chaque évaluation d’équilibrage de charge, ne revendique qu’une seule partition qui n’est pas revendiquée par un autre EventHubConsumerClient. Si toutes les partitions d’un EventHub sont revendiquées par un autre EventHubConsumerClient et que ce client a réclamé trop peu de partitions, ce client vole une partition à d’autres clients pour chaque évaluation d’équilibrage de charge, quelle que soit la stratégie d’équilibrage de charge. La stratégie gourmande est utilisée par défaut.

custom_endpoint_address
str ou None

Adresse de point de terminaison personnalisée à utiliser pour établir une connexion au service Event Hubs, ce qui permet aux demandes réseau d’être routées via toutes les passerelles Application Gateway ou autres chemins nécessaires pour l’environnement hôte. La valeur par défaut est None. Le format est « sb://< custom_endpoint_hostname> :<custom_endpoint_port> ». Si le port n’est pas spécifié dans le custom_endpoint_address, le port 443 est utilisé par défaut.

connection_verify
str ou None

Chemin d’accès au fichier de CA_BUNDLE personnalisé du certificat SSL utilisé pour authentifier l’identité du point de terminaison de connexion. La valeur par défaut est None, auquel cas certifi.where() sera utilisé.

uamqp_transport
bool

Indique s’il faut utiliser la bibliothèque uamqp comme transport sous-jacent. La valeur par défaut est False et la bibliothèque PURE PYTHON AMQP sera utilisée comme transport sous-jacent.

Type de retour

Exemples

Créez une instance de EventHubConsumerClient à partir de chaîne de connexion.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Obtient les propriétés du hub d’événements.

Les clés dans le dictionnaire retourné sont les suivantes :

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Retours

Dictionnaire contenant des informations sur le hub d’événements.

Type de retour

Exceptions

get_partition_ids

Obtenez les ID de partition de l’Event Hub.

get_partition_ids() -> List[str]

Retours

Liste des ID de partition.

Type de retour

Exceptions

get_partition_properties

Obtient les propriétés de la partition spécifiée.

Les clés du dictionnaire de propriétés sont les suivantes :

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Paramètres

partition_id
str
Obligatoire

ID de partition cible.

Retours

Dictionnaire contenant des propriétés de partition.

Type de retour

Exceptions

receive

Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Paramètres

on_event
callable[PartitionContext, EventData ou None]
Obligatoire

Fonction de rappel pour la gestion d’un événement reçu. Le rappel prend deux paramètres : partition_context qui contient le contexte de partition et l’événement qui est l’événement reçu. La fonction de rappel doit être définie comme suit : on_event(partition_context, événement) . Pour obtenir des informations détaillées sur le contexte de partition, reportez-vous à PartitionContext.

max_wait_time
float

Intervalle maximal en secondes que le processeur d’événements attend avant d’appeler le rappel. Si aucun événement n’est reçu dans cet intervalle, le rappel on_event est appelé avec Aucun. Si cette valeur est définie sur None ou 0 (valeur par défaut), le rappel n’est pas appelé tant qu’un événement n’est pas reçu.

partition_id
str

S’il est spécifié, le client reçoit uniquement de cette partition. Sinon, le client reçoit de toutes les partitions.

owner_level
int

Priorité pour un consommateur exclusif. Un consommateur exclusif est créé si owner_level est défini. Un consommateur avec une owner_level plus élevée a une priorité exclusive plus élevée. Le niveau propriétaire est également connu sous le nom de « valeur d’époque » du consommateur.

prefetch
int

Nombre d’événements à prérécupérer à partir du service pour traitement. La valeur par défaut est 300.

track_last_enqueued_event_properties
bool

Indique si le consommateur doit demander des informations sur l’événement en dernier file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus. Lorsque des informations sur l’événement de dernière file d’attente des partitions sont suivies, chaque événement reçu du service Event Hubs transporte les métadonnées relatives à la partition. Il en résulte une petite quantité de bande passante réseau supplémentaire qui est généralement un compromis favorable lorsqu’il est considéré par rapport à l’envoi périodique de demandes de propriétés de partition à l’aide du client Event Hub. Elle est définie sur False par défaut.

starting_position
str, int, datetime ou dict[str,any]

Commencez à recevoir à partir de cette position d’événement s’il n’existe aucune donnée de point de contrôle pour une partition. Les données de point de contrôle seront utilisées si elles sont disponibles. Il peut s’agir d’un dicté avec l’ID de partition comme clé et la position comme valeur pour les partitions individuelles, ou une valeur unique pour toutes les partitions. Le type de valeur peut être str, int ou datetime.datetime. Les valeurs « -1 » pour la réception à partir du début du flux et « @latest » sont également prises en charge pour recevoir uniquement de nouveaux événements. La valeur par défaut est « @latest ».

starting_position_inclusive
bool ou dict[str,bool]

Déterminez si le starting_position donné est inclusif(>=) ou non (>). True pour inclusive et False pour exclusive. Il peut s’agir d’un dict avec l’ID de partition comme clé et bool comme valeur indiquant si la starting_position d’une partition spécifique est inclusive ou non. Il peut également s’agir d’une valeur bool unique pour tous les starting_position. La valeur par défaut est False.

on_error
callable[[PartitionContext, Exception]]

Fonction de rappel qui sera appelée lorsqu’une erreur est déclenchée lors de la réception après l’épuisement des tentatives ou pendant le processus d’équilibrage de charge. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et l’erreur étant l’exception. partition_context peut être None si l’erreur est générée pendant le processus d’équilibrage de charge. Le rappel doit être défini comme suit : on_error(partition_context, erreur) . Le rappel on_error est également appelé si une exception non gérée est levée pendant le rappel on_event .

on_partition_initialize
callable[[PartitionContext]]

Fonction de rappel qui sera appelée après l’initialisation d’un consommateur pour une certaine partition. Elle est également appelée lorsqu’un nouveau consommateur de partition interne est créé pour prendre en charge le processus de réception d’un consommateur de partition interne ayant échoué et fermé. Le rappel prend un seul paramètre : partition_context qui contient les informations de partition. Le rappel doit être défini comme suit : on_partition_initialize(partition_context) .

on_partition_close
callable[[PartitionContext, CloseReason]]

Fonction de rappel qui sera appelée après la fermeture d’un consommateur pour une certaine partition. Elle est également appelée lorsque l’erreur est générée lors de la réception après l’épuisement des tentatives. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et la raison de la fermeture. Le rappel doit être défini comme suit : on_partition_close(partition_context, raison) . Reportez-vous à CloseReason pour les différentes raisons de clôture.

Type de retour

Exemples

Recevez des événements à partir de l’EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Recevoir des événements de partitions, avec l’équilibrage de charge et les points de contrôle facultatifs.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Paramètres

on_event_batch
callable[PartitionContext, list[EventData]]
Obligatoire

Fonction de rappel pour gérer un lot d’événements reçus. Le rappel prend deux paramètres : partition_context qui contient le contexte de partition et event_batch, qui est les événements reçus. La fonction de rappel doit être définie comme suit : on_event_batch(partition_context, event_batch) . event_batch peut être une liste vide si max_wait_time n’a pas la valeur None ni 0 et qu’aucun événement n’est reçu après max_wait_time. Pour obtenir des informations détaillées sur le contexte de partition, reportez-vous à PartitionContext.

max_batch_size
int

Nombre maximal d’événements dans un lot passé au rappel on_event_batch. Si le nombre réel d’événements reçus est supérieur à max_batch_size, les événements reçus sont divisés en lots et appellent le rappel pour chaque lot avec jusqu’à max_batch_size événements.

max_wait_time
float

Intervalle maximal en secondes que le processeur d’événements attend avant d’appeler le rappel. Si aucun événement n’est reçu dans cet intervalle, le rappel on_event_batch est appelé avec une liste vide.

partition_id
str

S’il est spécifié, le client reçoit uniquement de cette partition. Sinon, le client reçoit de toutes les partitions.

owner_level
int

Priorité pour un consommateur exclusif. Un consommateur exclusif est créé si owner_level est défini. Un consommateur avec une owner_level plus élevée a une priorité exclusive plus élevée. Le niveau propriétaire est également connu sous le nom de « valeur d’époque » du consommateur.

prefetch
int

Nombre d’événements à prérécupérer à partir du service pour traitement. La valeur par défaut est 300.

track_last_enqueued_event_properties
bool

Indique si le consommateur doit demander des informations sur l’événement en dernier file d’attente sur sa partition associée et suivre ces informations à mesure que les événements sont reçus. Lorsque des informations sur l’événement de dernière file d’attente des partitions sont suivies, chaque événement reçu du service Event Hubs transporte les métadonnées relatives à la partition. Il en résulte une petite quantité de bande passante réseau supplémentaire qui est généralement un compromis favorable lorsqu’il est considéré par rapport à l’envoi périodique de demandes de propriétés de partition à l’aide du client Event Hub. Elle est définie sur False par défaut.

starting_position
str, int, datetime ou dict[str,any]

Commencez à recevoir à partir de cette position d’événement s’il n’existe aucune donnée de point de contrôle pour une partition. Les données de point de contrôle seront utilisées si elles sont disponibles. Il peut s’agir d’un dicté avec l’ID de partition comme clé et la position comme valeur pour les partitions individuelles, ou une valeur unique pour toutes les partitions. Le type de valeur peut être str, int ou datetime.datetime. Les valeurs « -1 » pour la réception à partir du début du flux et « @latest » sont également prises en charge pour recevoir uniquement de nouveaux événements. La valeur par défaut est « @latest ».

starting_position_inclusive
bool ou dict[str,bool]

Déterminez si le starting_position donné est inclusif(>=) ou non (>). True pour inclusive et False pour exclusive. Il peut s’agir d’un dict avec l’ID de partition comme clé et bool comme valeur indiquant si la starting_position d’une partition spécifique est inclusive ou non. Il peut également s’agir d’une valeur bool unique pour tous les starting_position. La valeur par défaut est False.

on_error
callable[[PartitionContext, Exception]]

Fonction de rappel qui sera appelée lorsqu’une erreur est déclenchée lors de la réception après l’épuisement des tentatives ou pendant le processus d’équilibrage de charge. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et l’erreur étant l’exception. partition_context peut être None si l’erreur est générée pendant le processus d’équilibrage de charge. Le rappel doit être défini comme suit : on_error(partition_context, erreur) . Le rappel on_error est également appelé si une exception non gérée est levée pendant le rappel on_event .

on_partition_initialize
callable[[PartitionContext]]

Fonction de rappel qui sera appelée après l’initialisation d’un consommateur pour une certaine partition. Elle est également appelée lorsqu’un nouveau consommateur de partition interne est créé pour prendre en charge le processus de réception d’un consommateur de partition interne ayant échoué et fermé. Le rappel prend un seul paramètre : partition_context qui contient les informations de partition. Le rappel doit être défini comme suit : on_partition_initialize(partition_context) .

on_partition_close
callable[[PartitionContext, CloseReason]]

Fonction de rappel qui sera appelée après la fermeture d’un consommateur pour une certaine partition. Elle est également appelée lorsque l’erreur est générée lors de la réception après l’épuisement des tentatives. Le rappel prend deux paramètres : partition_context qui contient des informations de partition et la raison de la fermeture. Le rappel doit être défini comme suit : on_partition_close(partition_context, raison) . Reportez-vous à CloseReason pour les différentes raisons de clôture.

Type de retour

Exemples

Recevez des événements par lots à partir de l’EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)