Partilhar via


EventHubConsumerClient Classe

A classe EventHubConsumerClient define uma interface de alto nível para receber eventos do serviço Hubs de Eventos do Azure.

O principal objetivo do EventHubConsumerClient é receber eventos de todas as partições de um EventHub com balanceamento de carga e pontos de verificação.

Quando várias instâncias do EventHubConsumerClient estão em execução no mesmo hub de eventos, grupo de consumidores e localização de ponto de verificação, as partições serão distribuídas uniformemente entre elas.

Para ativar o balanceamento de carga e os pontos de verificação persistentes, checkpoint_store tem de ser definido ao criar o EventHubConsumerClient. Se não for fornecido um arquivo de pontos de verificação, o ponto de verificação será mantido internamente na memória.

Um EventHubConsumerClient também pode receber de uma partição específica quando chama o método receive() ou receive_batch() e especifica o partition_id. O balanceamento de carga não funcionará no modo de partição única. No entanto, os utilizadores ainda podem guardar pontos de verificação se a checkpoint_store estiver definida.

Herança
azure.eventhub.aio._client_base_async.ClientBaseAsync
EventHubConsumerClient

Construtor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parâmetros

fully_qualified_namespace
str
Necessário

O nome de anfitrião completamente qualificado para o espaço de nomes dos Hubs de Eventos. O formato do espaço de nomes é: .servicebus.windows.net.

eventhub_name
str
Necessário

O caminho do Hub de Eventos específico ao qual ligar o cliente.

consumer_group
str
Necessário

Receber eventos do hub de eventos para este grupo de consumidores.

credential
AsyncTokenCredential ou AzureSasCredential ou AzureNamedKeyCredential
Necessário

O objeto de credencial utilizado para autenticação que implementa uma interface específica para obter tokens. Aceita objetos EventHubSharedKeyCredentialde credenciais ou gerados pela biblioteca de identidades do azure e objetos que implementam o método *get_token(auto, âmbitos ).

logging_enable
bool

Se pretende exportar registos de rastreio de rede para o logger. A predefinição é Falso.

auth_timeout
float

O tempo em segundos a aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será imposto nenhum tempo limite do cliente.

user_agent
str

Se for especificado, será adicionado à frente da cadeia de agente do utilizador.

retry_total
int

O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interno de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo while-loop. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor da partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.

retry_backoff_factor
float

Um fator de recuo a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender durante: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.

retry_backoff_max
float

O tempo máximo de folga. O valor predefinido é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".

idle_timeout
float

Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver mais atividade. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido a inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que é utilizada a porta 5671. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.

http_proxy

Definições de proxy HTTP. Tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor str) e "proxy_port" (valor int).

checkpoint_store
Optional[CheckpointStore]

Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de ponto de verificação, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.

load_balancing_interval
float

Quando o balanceamento de carga é ativado. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é 30 segundos.

partition_ownership_expiration_interval
float

Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 180 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.

load_balancing_strategy
str ou LoadBalancingStrategy

Quando o balanceamento de carga é ativado, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver afirmado poucas partições, este cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.

custom_endpoint_address
Optional[str]

O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço dos Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.

connection_verify
Optional[str]

Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhum, caso em que certifi.where() será utilizado.

uamqp_transport
bool

Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.

socket_timeout
float

O tempo em segundos que o socket subjacente na ligação deve aguardar ao enviar e receber dados antes de exceder o tempo limite. O valor predefinido é 0,2 para TransportType.Amqp e 1 para TransportType.AmqpOverWebsocket. Se os erros eventHubsConnectionError estiverem a ocorrer devido ao tempo limite de escrita, poderá ter de ser transmitido um valor maior do que o predefinido. Isto destina-se a cenários de utilização avançada e, normalmente, o valor predefinido deve ser suficiente.

Exemplos

Crie uma nova instância do EventHubConsumerClient.


   import os
   from azure.eventhub.aio import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   consumer = EventHubConsumerClient(fully_qualified_namespace=fully_qualified_namespace,
                                     consumer_group='$Default',
                                     eventhub_name=eventhub_name,
                                     credential=EventHubSharedKeyCredential(shared_access_policy, shared_access_key))

Métodos

close

Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes.

from_connection_string

Crie um EventHubConsumerClient a partir de um cadeia de ligação.

get_eventhub_properties

Obtenha as propriedades do Hub de Eventos.

As chaves no dicionário devolvido incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Obter IDs de partição do Hub de Eventos.

get_partition_properties

Obtenha as propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais.

receive_batch

Receba eventos de partições em lotes, com balanceamento de carga e pontos de verificação opcionais.

close

Pare de obter eventos do Hub de Eventos e feche as ligações e a ligação AMQP subjacentes.

async close() -> None

Tipo de retorno

Exemplos

Feche o cliente.


   import os

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub.aio import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.

   recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
   await asyncio.sleep(3)  # keep receiving for 3 seconds
   recv_task.cancel()  # stop receiving

   # Close down the consumer handler explicitly.
   await consumer.close()

from_connection_string

Crie um EventHubConsumerClient a partir de um cadeia de ligação.

from_connection_string(conn_str: str, consumer_group: str, *, eventhub_name: str | None = None, logging_enable: bool = False, http_proxy: Dict[str, str | int] | None = None, auth_timeout: float = 60, user_agent: str | None = None, retry_total: int = 3, transport_type: TransportType = TransportType.Amqp, checkpoint_store: 'CheckpointStore' | None = None, load_balancing_interval: float = 30, **kwargs: Any) -> EventHubConsumerClient

Parâmetros

conn_str
str
Necessário

A cadeia de ligação de um Hub de Eventos.

consumer_group
str
Necessário

Receber eventos do Hub de Eventos para este grupo de consumidores.

eventhub_name
str

O caminho do Hub de Eventos específico ao qual ligar o cliente.

logging_enable
bool

Se pretende exportar registos de rastreio de rede para o logger. A predefinição é Falso.

http_proxy
dict

Definições de proxy HTTP. Tem de ser um dicionário com as seguintes chaves: "proxy_hostname" (valor str) e "proxy_port" (valor int). Além disso, também podem estar presentes as seguintes chaves: "nome de utilizador", "palavra-passe".

auth_timeout
float

O tempo em segundos a aguardar que um token seja autorizado pelo serviço. O valor predefinido é 60 segundos. Se estiver definido como 0, não será imposto nenhum tempo limite do cliente.

user_agent
str

Se for especificado, será adicionado à frente da cadeia de agente do utilizador.

retry_total
int

O número total de tentativas de refazer uma operação falhada quando ocorre um erro. O valor predefinido é 3. O contexto de retry_total na receção é especial: o método de receção é implementado por um método de receção interno de chamadas de ciclo de tempo em cada iteração. No caso de receção , retry_total especifica os números de repetição após o erro gerado pelo método de receção interno no ciclo while-loop. Se as tentativas de repetição estiverem esgotadas, a chamada de retorno on_error será chamada (se for fornecida) com as informações de erro. O consumidor da partição interna com falha será fechado (on_partition_close será chamado se for fornecido) e será criado um novo consumidor de partição interna (on_partition_initialize será chamado se for fornecido) para retomar a receção.

retry_backoff_factor
float

Um fator de recuo a aplicar entre tentativas após a segunda tentativa (a maioria dos erros é resolvida imediatamente por uma segunda tentativa sem demora). No modo fixo, a política de repetição irá sempre suspender para {backoff factor}. No modo "exponencial", a política de repetição irá suspender durante: {backoff factor} * (2 ** ({número de repetições totais} - 1)) segundos. Se o backoff_factor for 0,1, a repetição irá suspender para [0,0s, 0,2s, 0,4s, ...] entre repetições. O valor predefinido é 0,8.

retry_backoff_max
float

O tempo máximo de folga. O valor predefinido é 120 segundos (2 minutos).

retry_mode
str

O comportamento de atraso entre tentativas de repetição. Os valores suportados são "fixos" ou "exponenciais", em que a predefinição é "exponencial".

idle_timeout
float

Tempo limite, em segundos, após o qual este cliente fechará a ligação subjacente se não houver mais atividade. Por predefinição, o valor é Nenhum, o que significa que o cliente não será encerrado devido a inatividade, a menos que seja iniciado pelo serviço.

transport_type
TransportType

O tipo de protocolo de transporte que será utilizado para comunicar com o serviço Hubs de Eventos. A predefinição é TransportType.Amqp , caso em que é utilizada a porta 5671. Se a porta 5671 estiver indisponível/bloqueada no ambiente de rede, pode utilizar TransportType.AmqpOverWebsocket , que utiliza a porta 443 para comunicação.

checkpoint_store
Optional[CheckpointStore]

Um gestor que armazena os dados de balanceamento de carga e ponto de verificação da partição ao receber eventos. O arquivo de pontos de verificação será utilizado em ambos os casos de receção de todas as partições ou de uma única partição. Neste último caso, o balanceamento de carga não se aplica. Se não for fornecido um arquivo de ponto de verificação, o ponto de verificação será mantido internamente na memória e a instância EventHubConsumerClient receberá eventos sem balanceamento de carga.

load_balancing_interval
float

Quando o balanceamento de carga é ativado. Este é o intervalo, em segundos, entre duas avaliações de balanceamento de carga. A predefinição é 30 segundos.

partition_ownership_expiration_interval
float

Uma propriedade de partição expirará após este número de segundos. Cada avaliação de balanceamento de carga prolongará automaticamente o tempo de expiração da propriedade. A predefinição é 6 * load_balancing_interval, ou seja, 180 segundos ao utilizar o load_balancing_interval predefinido de 30 segundos.

load_balancing_strategy
str ou LoadBalancingStrategy

Quando o balanceamento de carga é ativado, utilizará esta estratégia para reivindicar e equilibrar a propriedade da partição. Utilize "ganancioso" ou LoadBalancingStrategy.GREEDY para a estratégia gananciosa, que, para cada avaliação de balanceamento de carga, obterá o número de partições não reclamadas necessárias para equilibrar a carga. Utilize "balanceado" ou LoadBalancingStrategy.BALANCED para a estratégia equilibrada que, para cada avaliação de balanceamento de carga, afirma apenas uma partição que não é reivindicada por outro EventHubConsumerClient. Se todas as partições de um EventHub forem reivindicadas por outros EventHubConsumerClient e este cliente tiver afirmado poucas partições, este cliente roubará uma partição de outros clientes para cada avaliação de balanceamento de carga, independentemente da estratégia de balanceamento de carga. A estratégia gananciosa é utilizada por predefinição.

custom_endpoint_address
Optional[str]

O endereço de ponto final personalizado a utilizar para estabelecer uma ligação ao serviço dos Hubs de Eventos, permitindo que os pedidos de rede sejam encaminhados através de quaisquer gateways de aplicação ou outros caminhos necessários para o ambiente anfitrião. A predefinição é Nenhuma. O formato seria como "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Se a porta não for especificada no custom_endpoint_address, será utilizada por predefinição a porta 443.

connection_verify
Optional[str]

Caminho para o ficheiro de CA_BUNDLE personalizado do certificado SSL que é utilizado para autenticar a identidade do ponto final de ligação. A predefinição é Nenhum, caso em que certifi.where() será utilizado.

uamqp_transport
bool

Se pretende utilizar a biblioteca uamqp como o transporte subjacente. O valor predefinido é Falso e a biblioteca AMQP de Python Puro será utilizada como o transporte subjacente.

Tipo de retorno

Exemplos

Crie uma nova instância do EventHubConsumerClient a partir de cadeia de ligação.


   import os
   from azure.eventhub.aio import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Obtenha as propriedades do Hub de Eventos.

As chaves no dicionário devolvido incluem:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

async get_eventhub_properties() -> Dict[str, Any]

Devoluções

Um dicionário que contém informações sobre o Hub de Eventos.

Tipo de retorno

Exceções

get_partition_ids

Obter IDs de partição do Hub de Eventos.

async get_partition_ids() -> List[str]

Devoluções

Uma lista de IDs de partição.

Tipo de retorno

Exceções

get_partition_properties

Obtenha as propriedades da partição especificada.

As chaves no dicionário de propriedades incluem:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

async get_partition_properties(partition_id: str) -> Dict[str, Any]

Parâmetros

partition_id
str
Necessário

O ID da partição de destino.

Devoluções

Um dicionário que contém propriedades de partição.

Tipo de retorno

Exceções

receive

Receber eventos de partições, com balanceamento de carga e pontos de verificação opcionais.

async receive(on_event: Callable[['PartitionContext', 'EventData' | None], Awaitable[None]], *, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parâmetros

on_event
Callable[PartitionContext, Optional[EventData]]
Necessário

A função de chamada de retorno para processar um evento recebido. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e o evento que é o evento recebido. A função de chamada de retorno deve ser definida como: on_event(partition_context, evento). Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.

max_wait_time
float

O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event será chamada com Nenhum. Se este valor estiver definido como Nenhum ou 0 (a predefinição), a chamada de retorno não será chamada até que um evento seja recebido.

partition_id
str

Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.

owner_level
int

A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.

prefetch
int

O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.

track_last_enqueued_event_properties
bool

Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.

starting_position
str, int, datetime ou dict[str,any]

Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos.

starting_position_inclusive
bool ou dict[str,bool]

Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.

on_error
Callable[[PartitionContext, Exception]]

A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .

on_partition_initialize
Callable[[PartitionContext]]

A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.

Tipo de retorno

Exemplos

Receber eventos do EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       async with consumer:
           await consumer.receive(
               on_event=on_event,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )

receive_batch

Receba eventos de partições em lotes, com balanceamento de carga e pontos de verificação opcionais.

async receive_batch(on_event_batch: Callable[['PartitionContext', List['EventData']], Awaitable[None]], *, max_batch_size: int = 300, max_wait_time: float | None = None, partition_id: str | None = None, owner_level: int | None = None, prefetch: int = 300, track_last_enqueued_event_properties: bool = False, starting_position: str | int | datetime.datetime | Dict[str, Any] | None = None, starting_position_inclusive: bool | Dict[str, bool] = False, on_error: Callable[['PartitionContext', Exception], Awaitable[None]] | None = None, on_partition_initialize: Callable[['PartitionContext'], Awaitable[None]] | None = None, on_partition_close: Callable[['PartitionContext', 'CloseReason'], Awaitable[None]] | None = None) -> None

Parâmetros

on_event_batch
Callable[PartitionContext, List[EventData]]
Necessário

A função de chamada de retorno para processar um lote de eventos recebidos. A chamada de retorno utiliza dois parâmetros: partition_context que contém o contexto de partição e event_batch, que são os eventos recebidos. A função de chamada de retorno deve ser definida como: on_event_batch(partition_context, event_batch). event_batch pode ser uma lista vazia se max_wait_time não for Nenhum nem 0 e nenhum evento for recebido após max_wait_time. Para obter informações detalhadas sobre o contexto de partição, veja PartitionContext.

max_batch_size
int

O número máximo de eventos num lote transmitido para chamada de retorno on_event_batch. Se o número real de eventos recebidos for maior do que max_batch_size, os eventos recebidos serão divididos em lotes e chamarão a chamada de retorno para cada lote com até max_batch_size eventos.

max_wait_time
float

O intervalo máximo em segundos que o processador de eventos aguardará antes de chamar a chamada de retorno. Se não forem recebidos eventos dentro deste intervalo, a chamada de retorno on_event_batch será chamada com uma lista vazia. Se este valor estiver definido como Nenhum ou 0 (a predefinição), a chamada de retorno não será chamada até que os eventos sejam recebidos.

partition_id
str

Se for especificado, o cliente só receberá desta partição. Caso contrário, o cliente receberá de todas as partições.

owner_level
int

A prioridade para um consumidor exclusivo. Será criado um consumidor exclusivo se owner_level estiver definido. Um consumidor com um owner_level superior tem uma prioridade exclusiva mais elevada. O nível de proprietário também é conhecido como o "valor de época" do consumidor.

prefetch
int

O número de eventos a pré-obter do serviço para processamento. A predefinição é 300.

track_last_enqueued_event_properties
bool

Indica se o consumidor deve pedir informações sobre o último evento em fila na partição associada e controlar essas informações à medida que os eventos são recebidos. Quando as informações sobre o último evento em fila de espera das partições estão a ser controladas, cada evento recebido do serviço Hubs de Eventos irá transportar metadados sobre a partição. Isto resulta numa pequena quantidade de consumo adicional de largura de banda de rede que é geralmente um compromisso favorável quando considerado contra a realização periódica de pedidos de propriedades de partição com o cliente do Hub de Eventos. Está definido como Falso por predefinição.

starting_position
str, int, datetime ou dict[str,any]

Comece a receber a partir desta posição de evento se não existirem dados de ponto de verificação para uma partição. Os dados do ponto de verificação serão utilizados, se disponíveis. Isto pode ser um ditado com o ID da partição como a chave e a posição como o valor para partições individuais ou um valor único para todas as partições. O tipo de valor pode ser str, int ou datetime.datetime. Também são suportados os valores "-1" para receber desde o início do fluxo e "@latest" para receber apenas novos eventos.

starting_position_inclusive
bool ou dict[str,bool]

Determine se o starting_position especificado é inclusivo(>=) ou não (>). Verdadeiro para inclusive e Falso para exclusivo. Isto pode ser um ditado com o ID da partição como chave e bool como o valor que indica se a starting_position para uma partição específica é inclusiva ou não. Também pode ser um valor bool único para todos os starting_position. O valor predefinido é Falso.

on_error
Callable[[PartitionContext, Exception]]

A função de chamada de retorno que será chamada quando um erro é gerado durante a receção após as tentativas de repetição serem esgotadas ou durante o processo de balanceamento de carga. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e o erro é a exceção. partition_context pode ser Nenhum se o erro for gerado durante o processo de balanceamento de carga. A chamada de retorno deve ser definida como: on_error(partition_context, erro). A chamada de retorno on_error também será chamada se for gerada uma exceção não processada durante a chamada de retorno do on_event .

on_partition_initialize
Callable[[PartitionContext]]

A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição concluir a inicialização. Também seria chamado quando um novo consumidor interno de partições for criado para assumir o processo de receção de um consumidor de partições interno com falhas e fechado. A chamada de retorno utiliza um único parâmetro: partition_context que contém as informações da partição. A chamada de retorno deve ser definida como: on_partition_initialize(partition_context).

on_partition_close
Callable[[PartitionContext, CloseReason]]

A função de chamada de retorno que será chamada depois de um consumidor para uma determinada partição ser fechada. Também seria chamado quando o erro é gerado durante a receção após as tentativas de repetição estarem esgotadas. A chamada de retorno utiliza dois parâmetros: partition_context que contém informações de partição e motivo para fechar. A chamada de retorno deve ser definida como: on_partition_close(partition_context, razão). CloseReason Veja os vários motivos de fecho.

Tipo de retorno

Exemplos

Receber eventos em lotes do EventHub.


       logger = logging.getLogger("azure.eventhub")

       async def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, async will have better performance.
           logger.info(
               "{} events received from partition: {}".format(len(event_batch), partition_context.partition_id)
           )

       async with consumer:
           await consumer.receive_batch(
               on_event_batch=on_event_batch,
               starting_position="-1",  # "-1" is from the beginning of the partition.
           )