Hubs de Eventos do Azure biblioteca da Loja de Pontos de Verificação para Javascript com Blobs de Armazenamento
Uma solução baseada no armazenamento de Blobs do Azure para armazenar pontos de verificação e ajudar no balanceamento de carga ao utilizar EventHubConsumerClient
a partir da biblioteca @azure/event-hubs
Código fonte | Pacote (npm) | Documentação | de Referência da APIExemplos
Introdução
Instalar o pacote
Instalar a biblioteca de Blobs do Hubs de Eventos do Azure Checkpoint Store com o npm
npm install @azure/eventhubs-checkpointstore-blob
Pré-requisitos: tem de ter uma subscrição do Azure, um Espaço de Nomes dos Hubs de Eventos para utilizar este pacote e uma Conta de armazenamento
Se estiver a utilizar este pacote numa aplicação Node.js, utilize Node.js 8.x ou superior.
Configurar Typescript
Os utilizadores do TypeScript precisam de ter definições de tipo de Nó instaladas:
npm install @types/node
Também tem de ativar compilerOptions.allowSyntheticDefaultImports
no tsconfig.json. Tenha em atenção que, se tiver ativado compilerOptions.esModuleInterop
, allowSyntheticDefaultImports
está ativado por predefinição. Veja o manual de opções do compilador do TypeScript para obter mais informações.
Conceitos-chave
Dimensionar: Crie vários consumidores, com cada consumidor a assumir a propriedade da leitura a partir de algumas partições dos Hubs de Eventos.
Balanceamento de carga: As aplicações que suportam o balanceamento de carga consistem numa ou mais instâncias das
EventHubConsumerClient
quais foram configuradas para consumir eventos do mesmo Hub de Eventos e grupo de consumidores e o mesmoCheckpointStore
. Equilibram a carga de trabalho em diferentes instâncias ao distribuir as partições a serem processadas entre si.Pontos de verificação: É um processo pelo qual os leitores marcam ou consolidam a sua posição numa sequência de eventos de partição. O ponto de verificação é da responsabilidade do consumidor e ocorre numa base por partição dentro de um grupo de consumidores. Esta responsabilidade significa que para cada grupo de consumidores, cada leitor da partição tem de manter um controlo da respetiva posição atual no fluxo de eventos e pode informar o serviço quando considera o fluxo de dados completo.
Se um leitor for desligado de uma partição, quando voltar a ser ligado, começa a leitura no ponto de verificação que foi previamente submetido pelo último leitor dessa partição nesse grupo de consumidores. Quando o leitor se liga, transmite o desvio para o hub de eventos para especificar a localização em que pretende começar a ler. Desta forma, pode utilizar o ponto de verificação para marcar os eventos como “concluídos” pelas aplicações a jusante e para fornecer resiliência se ocorrer uma ativação pós-falha entre os leitores em execução em computadores diferentes. É possível devolver dados mais antigos ao especificar um desvio inferior a partir deste processo de ponto de verificação. Através deste mecanismo, o ponto de verificação ativa a resiliência pós-falha e a repetição do fluxo de eventos.
Um BlobCheckpointStore é uma classe que implementa os principais métodos exigidos pelo EventHubConsumerClient para equilibrar a carga e atualizar pontos de verificação.
Exemplos
- Criar um CheckpointStore com o Armazenamento de Blobs do Azure
- Eventos de ponto de verificação com o armazenamento de Blobs do Azure
Criar um CheckpointStore
com Armazenamento de Blobs do Azure
Utilize o fragmento de código abaixo para criar um CheckpointStore
. Terá de fornecer a cadeia de ligação à sua conta de armazenamento.
import { ContainerClient } from "@azure/storage-blob",
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob"
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!containerClient.exists()) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
Eventos de ponto de verificação com o armazenamento de Blobs do Azure
Para os eventos de ponto de verificação recebidos com Armazenamento de Blobs do Azure, terá de transmitir um objeto compatível com a interface SubscriptionEventHandlers juntamente com o código para chamar o updateCheckpoint()
método .
Neste exemplo, SubscriptionHandlers
implementa SubscriptionEventHandlers e também processa pontos de verificação.
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
import { EventHubConsumerClient } from "@azure/event-hubs";
const consumerGroup = "consumer-group-name";
const connectionString = "event-hub-connectionstring";
const containerClient = new ContainerClient("storage-connection-string", "container-name");
if (!(await containerClient.exists())) {
await containerClient.create(); // This can be skipped if the container already exists
}
const checkpointStore = new BlobCheckpointStore(containerClient);
class SubscriptionHandlers {
async processEvents(event, context) {
// custom logic for processing events goes here
// Checkpointing will allow your service to restart and pick
// up from where it left off.
//
// You'll want to balance how often you checkpoint with the
// performance of your underlying checkpoint store.
await context.updateCheckpoint(event);
}
async processError(err, context) {
// handle any errors that occur during the course of
// this subscription
console.log(`Errors in subscription: ${err}`);
}
}
const consumerClient = new EventHubConsumerClient(consumerGroup, connectionString, checkpointStore);
const subscription = consumerClient.subscribe(new SubscriptionHandlers());
// events will now flow into the handlers defined above
// to stop the subscription:
subscription.close();
Resolução de problemas
Ativar registos
Pode definir a AZURE_LOG_LEVEL
variável de ambiente para um dos seguintes valores para ativar o registo como stderr
:
- verbose
- informações
- aviso
- erro
Também pode definir o nível de registo programaticamente ao importar o pacote @azure/logger e chamar a setLogLevel
função com um dos valores de nível de registo.
Ao definir um nível de registo programaticamente ou através da AZURE_LOG_LEVEL
variável de ambiente, todos os registos escritos com um nível de registo igual ou inferior ao que escolher serão emitidos.
Por exemplo, quando define o nível de registo como info
, os registos escritos para níveis warning
e error
também são emitidos.
Este SDK segue as diretrizes do SDK do Azure para TypeScript ao determinar em que nível iniciar sessão.
Em alternativa, pode definir a DEBUG
variável de ambiente para obter registos ao utilizar esta biblioteca.
Isto pode ser útil se também pretender emitir registos das dependências rhea-promise
e rhea
também.
Nota: AZURE_LOG_LEVEL, se definido, tem precedência sobre DEBUG.
Não especifique quaisquer azure
bibliotecas através de DEBUG ao especificar também AZURE_LOG_LEVEL ou chamar setLogLevel.
Pode definir a seguinte variável de ambiente para obter os registos de depuração ao utilizar esta biblioteca.
- Obter apenas registos de depuração de nível de informações a partir do Blob do Ponto de Verificação do Eventhubs.
export DEBUG=azure:eventhubs-checkpointstore-blob:info
Iniciar sessão num ficheiro
Ative o registo conforme mostrado acima e, em seguida, execute o script de teste da seguinte forma:
As instruções de registo do script de teste vão para
out.log
e as instruções de registo do sdk vão paradebug.log
.node your-test-script.js > out.log 2>debug.log
As instruções de registo do script de teste e do sdk vão para o mesmo ficheiro
out.log
ao redirecionar o stderr para stdout (&1) e, em seguida, redirecionar stdout para um ficheiro:node your-test-script.js >out.log 2>&1
As instruções de registo do script de teste e do sdk vão para o mesmo ficheiro
out.log
.node your-test-script.js &> out.log
Passos Seguintes
Veja o diretório de exemplos para obter um exemplo detalhado.
Contribuir
Se quiser contribuir para esta biblioteca, leia o guia de contribuição para saber mais sobre como criar e testar o código.
Azure SDK for JavaScript