Compartir a través de


integración de .NET AspireApache Kafka

Incluye:integración de hospedaje y Client

Apache Kafka es una plataforma de streaming de eventos distribuidos de código abierto. Resulta útil para crear canalizaciones de datos en tiempo real y aplicaciones de streaming. La integración de .NET AspireApache Kafka permite conectarse a instancias de Kafka existentes o crear nuevas instancias desde .NET con la imagen de contenedor de docker.io/confluentinc/confluent-local.

Integración de hospedaje

La integración de modelos de Apache Kafka representa a un server de Kafka como el tipo KafkaServerResource. Para acceder a este tipo, instale el 📦Aspire.Hosting.Kafka paquete NuGet en el proyecto host de la aplicación y, a continuación, agréguelo con el generador.

dotnet add package Aspire.Hosting.Kafka

Para obtener más información, consulte dotnet add package o Administrar las dependencias de paquetes en las aplicaciones .NET.

Añadir recurso server de Kafka

En el proyecto host de la aplicación, llame a AddKafka en la instancia de builder para agregar un recurso de server de Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Cuando .NET.NET Aspire agrega una imagen de contenedor al host de la aplicación, como se muestra en el ejemplo anterior con la imagen de docker.io/confluentinc/confluent-local, crea una nueva instancia de Kafka server en el equipo local. Al serverse agrega una referencia a la kafka de Kafka (la variable ExampleProject). El recurso de server kafka incluye puertos predeterminados

El método WithReference configura una conexión en el ExampleProject denominado "kafka". Para obtener más información, consulte ciclo de vida de los recursos de un contenedor.

Propina

Si prefiere conectarse a un serverde Kafka existente, llame a AddConnectionString en su lugar. Para obtener más información, vea Hacer referencia a los recursos existentes.

Adición de la interfaz de usuario de Kafka

Para agregar el de interfaz de usuario de Kafka al recurso de Kafka, llame al método :

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

La interfaz de usuario de Kafka es una interfaz de usuario web gratuita de código abierto para supervisar y administrar clústeres de Apache Kafka. .NET .NET Aspire agrega otra imagen de contenedor docker.io/provectuslabs/kafka-ui al host de la aplicación que ejecuta la interfaz de usuario de Kafka.

Cambio del puerto de host de la interfaz de usuario de Kafka

Para cambiar el puerto de host de la interfaz de usuario de Kafka, encadene una llamada al método WithHostPort:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

La interfaz de usuario de Kafka es accesible en http://localhost:9100 en el ejemplo anterior.

Agregar el recurso server de Kafka con volumen de datos

Para agregar un volumen de datos al recurso de server de Kafka, llame al método WithDataVolume en el recurso server de Kafka:

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

El volumen de datos se usa para conservar los datos de Kafka server fuera del ciclo de vida de su contenedor. El volumen de datos se monta en la ruta /var/lib/kafka/data del contenedor server de Kafka, y cuando no se proporciona un parámetro name, el nombre se genera aleatoriamente. Para obtener más información sobre los volúmenes de datos y los detalles sobre por qué se prefieren a montajes de enlace, consulte Docker documentación: Volúmenes.

Añadir el recurso Kafka server con montaje enlazado de datos

Para agregar un punto de montaje ligado a datos al recurso server de Kafka, llame al método WithDataBindMount.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Importante

Los montajes de datos de enlace tienen una funcionalidad limitada en comparación con los volúmenes , que ofrecen un mejor rendimiento, portabilidad y seguridad, lo que los hace más adecuados para entornos de producción. Sin embargo, los montajes de enlaces permiten el acceso directo y la modificación de archivos en el sistema anfitrión, lo cual es ideal para el desarrollo y las pruebas donde se necesitan cambios en tiempo real.

Las uniones de datos dependen del sistema de archivos de la máquina anfitriona para persistir los datos de Kafka server durante los reinicios del contenedor. El montaje del enlace de datos está montado en el C:\Kafka\Data en Windows (o /Kafka/Data en Unix) en la ruta de acceso en el equipo host en el contenedor de Kafka server. Para obtener más información sobre los montajes de enlace de datos, consulte Docker documento: Montajes de enlace de datos.

Hospedaje de comprobaciones de estado de integración

La integración de hospedaje de Kafka agrega automáticamente una comprobación de estado para el recurso de server de Kafka. La comprobación de estado verifica que un productor de Kafka con el nombre de conexión especificado puede conectarse y almacenar un tópico en el clúster serverde Kafka.

La integración de hospedaje se basa en el paquete NuGet 📦 AspNetCore.HealthChecks.Kafka.

integración de Client

Para empezar a trabajar con la integración de .NET AspireApache Kafka, instale el 📦Aspire. Confluent.Kafka paquete NuGet en el proyecto client-consuming, es decir, el proyecto de la aplicación que usa el Apache Kafkaclient.

dotnet add package Aspire.Confluent.Kafka

Agregar productor de Kafka

En el archivo Program.cs del proyecto que consume client, llame al método de extensión AddKafkaProducer para registrar un IProducer<TKey, TValue> para su uso a través del contenedor de inyección de dependencias. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a enviar al agente. Estos parámetros genéricos se usan en AddKafkaProducer para crear una instancia de ProducerBuilder<TKey, TValue>. Este método también toma el parámetro de nombre de conexión.

builder.AddKafkaProducer<string, string>("messaging");

A continuación, puede obtener la instancia IProducer<TKey, TValue> mediante la inserción de dependencias. Por ejemplo, para recuperar el productor de un IHostedService:

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

Para obtener más información sobre los trabajadores, consulte Servicios de trabajo en .NET.

Agregar un consumidor de Kafka

Para registrar un IConsumer<TKey, TValue> para su uso a través del contenedor de inyección de dependencias, llame al método de extensión AddKafkaConsumer en el archivo Program.cs del proyecto que consume client. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a recibir del agente. Estos parámetros genéricos se usan en AddKafkaConsumer para crear una instancia de ConsumerBuilder<TKey, TValue>. Este método también toma el parámetro de nombre de conexión.

builder.AddKafkaConsumer<string, string>("messaging");

A continuación, puede obtener la instancia IConsumer<TKey, TValue> mediante la inserción de dependencias. Por ejemplo, para recuperar al consumidor de un IHostedService:

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

Adición de productores o consumidores clave de Kafka

Puede haber situaciones en las que quiera registrar varias instancias de productor o consumidor con nombres de conexión diferentes. Para registrar productores o consumidores clave de Kafka, llame a la API adecuada.

Para obtener más información sobre los servicios con claves, consulte .NET inserción de dependencias: Servicios con claves.

Configuración

La integración de .NET AspireApache Kafka proporciona varias opciones para configurar la conexión en función de los requisitos y convenciones del proyecto.

Uso de una cadena de conexión

Al usar una cadena de conexión de la sección de configuración de ConnectionStrings, puede proporcionar el nombre de la cadena de conexión al llamar a builder.AddKafkaProducer() o builder.AddKafkaProducer():

builder.AddKafkaProducer<string, string>("kafka-producer");

A continuación, la cadena de conexión se obtiene de la sección de configuración de ConnectionStrings:

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

El valor de la cadena de conexión se establece en la propiedad BootstrapServers de la instancia de IProducer<TKey, TValue> o IConsumer<TKey, TValue> generada. Para obtener más información, vea BootstrapServers.

Uso de proveedores de configuración

La integración de .NET AspireApache Kafka admite Microsoft.Extensions.Configuration. Carga el KafkaProducerSettings o KafkaConsumerSettings desde la configuración usando respectivamente las claves Aspire:Confluent:Kafka:Producer y Aspire.Confluent:Kafka:Consumer. El fragmento de código siguiente es un ejemplo de un archivo appsettings.json que configura algunas de las opciones:

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

Las propiedades Config de las secciones de configuración de Aspire:Confluent:Kafka:Producer y Aspire.Confluent:Kafka:Consumer, respectivamente, se enlazan a instancias de ProducerConfig y ConsumerConfig.

Confluent.Kafka.Consumer<TKey, TValue> requiere que se establezca la propiedad ClientId para permitir que el agente realice un seguimiento de los desplazamientos de mensajes consumidos.

Para el esquema completo de integración de Kafka clientJSON, consulte Aspire.Confluent.Kafka/ConfigurationSchema.json.

Usa delegados en línea

Hay varios delegados en línea disponibles para configurar diferentes opciones.

Configuración deKafkaProducerSettings y KafkaConsumerSettings

Puede pasar el delegado Action<KafkaProducerSettings> configureSettings para configurar algunas o todas las opciones en línea, por ejemplo, para deshabilitar las verificaciones de salud desde el código.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

Puede configurar un consumidor en línea directamente en el código:

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
Configuración de ProducerBuilder<TKey, TValue> y ConsumerBuilder<TKey, TValue>

Para configurar los constructores de Confluent.Kafka, pase un Action<ProducerBuilder<TKey, TValue>> (o un Action<ConsumerBuilder<TKey, TValue>>):

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Al registrar productores y consumidores, si necesita acceder a un servicio registrado en el contenedor DI, puede pasar un Action<IServiceProvider, ProducerBuilder<TKey, TValue>> o un Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>, respectivamente.

Considere el siguiente ejemplo de registro de productor:

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

Para más información, consulte la documentación de la API de ProducerBuilder<TKey, TValue> y la API de ConsumerBuilder<TKey, TValue>.

verificaciones de estado de integración Client

De forma predeterminada, las integraciones de .NET.NET Aspire habilitan las comprobaciones de estado de para todos los servicios. Para obtener más información, consulte .NET.NET Aspire integrations overview.

La integración de .NET AspireApache Kafka maneja los siguientes escenarios de verificación de salud:

  • Agrega la comprobación de estado Aspire.Confluent.Kafka.Producer cuando KafkaProducerSettings.DisableHealthChecks es false.
  • Agrega la comprobación de estado Aspire.Confluent.Kafka.Consumer cuando KafkaConsumerSettings.DisableHealthChecks es false.
  • Se integra con el punto de conexión HTTP de /health, que especifica que todas las comprobaciones de estado registradas deben pasar para que la aplicación se considere lista para aceptar el tráfico.

Observabilidad y telemetría

.NET .NET Aspire integraciones configuran automáticamente las configuraciones de registro, seguimiento y métricas, que a veces se conocen como los pilares de la observabilidad. Para obtener más información sobre la observabilidad de integración y la telemetría, consulte información general sobre las integraciones de .NET.NET Aspire. En función del servicio de respaldo, algunas integraciones solo pueden admitir algunas de estas características. Por ejemplo, algunas integraciones admiten el registro y el seguimiento, pero no las métricas. Las funciones de telemetría también se pueden deshabilitar mediante las técnicas presentadas en la sección de Configuración .

Registro

La integración de .NET AspireApache Kafka usa las siguientes categorías de registro:

  • Aspire.Confluent.Kafka

Rastreo

La integración de .NET AspireApache Kafka no emite trazas distribuidas.

Métricas

La integración de .NET AspireApache Kafka emite las métricas siguientes mediante OpenTelemetry:

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

Consulte también