Partilhar via


Hubs de Eventos do Azure biblioteca da Loja de Pontos de Verificação para Javascript com Blobs de Armazenamento

Uma solução baseada no armazenamento de Blobs do Azure para armazenar pontos de verificação e ajudar no balanceamento de carga ao utilizar EventHubConsumerClient a partir da biblioteca @azure/event-hubs

Código fonte | Pacote (npm) | Documentação | de Referência da APIExemplos

Introdução

Instalar o pacote

Instalar a biblioteca de Blobs do Hubs de Eventos do Azure Checkpoint Store com o npm

npm install @azure/eventhubs-checkpointstore-blob

Pré-requisitos: tem de ter uma subscrição do Azure, um Espaço de Nomes dos Hubs de Eventos para utilizar este pacote e uma Conta de armazenamento

Se estiver a utilizar este pacote numa aplicação Node.js, utilize Node.js 8.x ou superior.

Configurar Typescript

Os utilizadores do TypeScript precisam de ter definições de tipo de Nó instaladas:

npm install @types/node

Também tem de ativar compilerOptions.allowSyntheticDefaultImports no tsconfig.json. Tenha em atenção que, se tiver ativado compilerOptions.esModuleInterop, allowSyntheticDefaultImports está ativado por predefinição. Veja o manual de opções do compilador do TypeScript para obter mais informações.

Conceitos-chave

  • Dimensionar: Crie vários consumidores, com cada consumidor a assumir a propriedade da leitura a partir de algumas partições dos Hubs de Eventos.

  • Balanceamento de carga: As aplicações que suportam o balanceamento de carga consistem numa ou mais instâncias das EventHubConsumerClient quais foram configuradas para consumir eventos do mesmo Hub de Eventos e grupo de consumidores e o mesmo CheckpointStore. Equilibram a carga de trabalho em diferentes instâncias ao distribuir as partições a serem processadas entre si.

  • Pontos de verificação: É um processo pelo qual os leitores marcam ou consolidam a sua posição numa sequência de eventos de partição. O ponto de verificação é da responsabilidade do consumidor e ocorre numa base por partição dentro de um grupo de consumidores. Esta responsabilidade significa que para cada grupo de consumidores, cada leitor da partição tem de manter um controlo da respetiva posição atual no fluxo de eventos e pode informar o serviço quando considera o fluxo de dados completo.

    Se um leitor for desligado de uma partição, quando voltar a ser ligado, começa a leitura no ponto de verificação que foi previamente submetido pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se liga, transmite o desvio para o hub de eventos para especificar a localização em que pretende começar a ler. Desta forma, pode utilizar o ponto de verificação para marcar os eventos como “concluídos” pelas aplicações a jusante e para fornecer resiliência se ocorrer uma ativação pós-falha entre os leitores em execução em computadores diferentes. É possível devolver dados mais antigos ao especificar um desvio inferior a partir deste processo de ponto de verificação. Através deste mecanismo, o ponto de verificação ativa a resiliência pós-falha e a repetição do fluxo de eventos.

    Um BlobCheckpointStore é uma classe que implementa os principais métodos exigidos pelo EventHubConsumerClient para equilibrar a carga e atualizar pontos de verificação.

Exemplos

Criar um CheckpointStore com Armazenamento de Blobs do Azure

Utilize o fragmento de código abaixo para criar um CheckpointStore. Terá de fornecer a cadeia de ligação à sua conta de armazenamento.

import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!containerClient.exists()) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore =  new BlobCheckpointStore(containerClient);

Eventos de ponto de verificação com o armazenamento de Blobs do Azure

Para os eventos de ponto de verificação recebidos com Armazenamento de Blobs do Azure, terá de transmitir um objeto compatível com a interface SubscriptionEventHandlers juntamente com o código para chamar o updateCheckpoint() método .

Neste exemplo, SubscriptionHandlers implementa SubscriptionEventHandlers e também processa pontos de verificação.

import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";

const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";

const containerClient = new ContainerClient("storage-connection-string", "container-name");

if (!(await containerClient.exists())) {
  await containerClient.create(); // This can be skipped if the container already exists
}

const checkpointStore = new BlobCheckpointStore(containerClient);

class SubscriptionHandlers {
  async processEvents(event, context) {
    // custom logic for processing events goes here

    // Checkpointing will allow your service to restart and pick
    // up from where it left off.
    //
    // You'll want to balance how often you checkpoint with the
    // performance of your underlying checkpoint store.
    await context.updateCheckpoint(event);
  }

  async processError(err, context) {
    // handle any errors that occur during the course of
    // this subscription
    console.log(`Errors in subscription: ${err}`);
  }
}

const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);

const subscription = consumerClient.subscribe(new SubscriptionHandlers());

// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();

Resolução de problemas

Ativar registos

Pode definir a AZURE_LOG_LEVEL variável de ambiente para um dos seguintes valores para ativar o registo como stderr:

  • verbose
  • informações
  • aviso
  • erro

Também pode definir o nível de registo programaticamente ao importar o pacote @azure/logger e chamar a setLogLevel função com um dos valores de nível de registo.

Ao definir um nível de registo programaticamente ou através da AZURE_LOG_LEVEL variável de ambiente, todos os registos escritos com um nível de registo igual ou inferior ao que escolher serão emitidos. Por exemplo, quando define o nível de registo como info, os registos escritos para níveis warning e error também são emitidos. Este SDK segue as diretrizes do SDK do Azure para TypeScript ao determinar em que nível iniciar sessão.

Em alternativa, pode definir a DEBUG variável de ambiente para obter registos ao utilizar esta biblioteca. Isto pode ser útil se também pretender emitir registos das dependências rhea-promise e rhea também.

Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG. Não especifique quaisquer azure bibliotecas através de DEBUG ao especificar também AZURE_LOG_LEVEL ou chamar setLogLevel.

Pode definir a seguinte variável de ambiente para obter os registos de depuração ao utilizar esta biblioteca.

  • Obter apenas registos de depuração de nível de informações a partir do Blob do Ponto de Verificação do Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info

Iniciar sessão num ficheiro

  • Ative o registo conforme mostrado acima e, em seguida, execute o script de teste da seguinte forma:

    • As instruções de registo do script de teste vão para out.log e as instruções de registo do sdk vão para debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • As instruções de registo do script de teste e do sdk vão para o mesmo ficheiro out.log ao redirecionar o stderr para stdout (&1) e, em seguida, redirecionar stdout para um ficheiro:

      node your-test-script.js >out.log 2>&1
      
    • As instruções de registo do script de teste e do sdk vão para o mesmo ficheiro out.log.

      node your-test-script.js &> out.log
      

Passos Seguintes

Veja o diretório de exemplos para obter um exemplo detalhado.

Contribuir

Se quiser contribuir para esta biblioteca, leia o guia de contribuição para saber mais sobre como criar e testar o código.

Impressões