integración de .NET AspireApache Kafka
Incluye:integración de hospedaje y Client
Apache Kafka es una plataforma de streaming de eventos distribuidos de código abierto. Resulta útil para crear canalizaciones de datos en tiempo real y aplicaciones de streaming. La integración de .NET AspireApache Kafka permite conectarse a instancias de Kafka existentes o crear nuevas instancias desde .NET con la imagen de contenedor de docker.io/confluentinc/confluent-local
.
Integración de hospedaje
La integración de modelos de Apache Kafka representa a un server de Kafka como el tipo KafkaServerResource. Para acceder a este tipo, instale el 📦Aspire.Hosting.Kafka paquete NuGet en el proyecto host de la aplicación y, a continuación, agréguelo con el generador.
dotnet add package Aspire.Hosting.Kafka
Para obtener más información, consulte dotnet add package o Administrar las dependencias de paquetes en las aplicaciones .NET.
Añadir recurso server de Kafka
En el proyecto host de la aplicación, llame a AddKafka en la instancia de builder
para agregar un recurso de server de Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Cuando .NET.NET Aspire agrega una imagen de contenedor al host de la aplicación, como se muestra en el ejemplo anterior con la imagen de docker.io/confluentinc/confluent-local
, crea una nueva instancia de Kafka server en el equipo local. Al serverse agrega una referencia a la kafka
de Kafka (la variable ExampleProject
). El recurso de server kafka incluye puertos predeterminados
El método WithReference configura una conexión en el ExampleProject
denominado "kafka"
. Para obtener más información, consulte ciclo de vida de los recursos de un contenedor.
Propina
Si prefiere conectarse a un serverde Kafka existente, llame a AddConnectionString en su lugar. Para obtener más información, vea Hacer referencia a los recursos existentes.
Adición de la interfaz de usuario de Kafka
Para agregar el de interfaz de usuario de Kafka
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
La interfaz de usuario de Kafka es una interfaz de usuario web gratuita de código abierto para supervisar y administrar clústeres de Apache Kafka.
.NET
.NET Aspire agrega otra imagen de contenedor docker.io/provectuslabs/kafka-ui
al host de la aplicación que ejecuta la interfaz de usuario de Kafka.
Cambio del puerto de host de la interfaz de usuario de Kafka
Para cambiar el puerto de host de la interfaz de usuario de Kafka, encadene una llamada al método WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
La interfaz de usuario de Kafka es accesible en http://localhost:9100
en el ejemplo anterior.
Agregar el recurso server de Kafka con volumen de datos
Para agregar un volumen de datos al recurso de server de Kafka, llame al método WithDataVolume en el recurso server de Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
El volumen de datos se usa para conservar los datos de Kafka server fuera del ciclo de vida de su contenedor. El volumen de datos se monta en la ruta /var/lib/kafka/data
del contenedor server de Kafka, y cuando no se proporciona un parámetro name
, el nombre se genera aleatoriamente. Para obtener más información sobre los volúmenes de datos y los detalles sobre por qué se prefieren a montajes de enlace, consulte Docker documentación: Volúmenes.
Añadir el recurso Kafka server con montaje enlazado de datos
Para agregar un punto de montaje ligado a datos al recurso server de Kafka, llame al método WithDataBindMount.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Importante
Los montajes de datos de enlace tienen una funcionalidad limitada en comparación con los volúmenes , que ofrecen un mejor rendimiento, portabilidad y seguridad, lo que los hace más adecuados para entornos de producción. Sin embargo, los montajes de enlaces permiten el acceso directo y la modificación de archivos en el sistema anfitrión, lo cual es ideal para el desarrollo y las pruebas donde se necesitan cambios en tiempo real.
Las uniones de datos dependen del sistema de archivos de la máquina anfitriona para persistir los datos de Kafka server durante los reinicios del contenedor. El montaje del enlace de datos está montado en el C:\Kafka\Data
en Windows (o /Kafka/Data
en Unix) en la ruta de acceso en el equipo host en el contenedor de Kafka server. Para obtener más información sobre los montajes de enlace de datos, consulte Docker documento: Montajes de enlace de datos.
Hospedaje de comprobaciones de estado de integración
La integración de hospedaje de Kafka agrega automáticamente una comprobación de estado para el recurso de server de Kafka. La comprobación de estado verifica que un productor de Kafka con el nombre de conexión especificado puede conectarse y almacenar un tópico en el clúster serverde Kafka.
La integración de hospedaje se basa en el paquete NuGet 📦 AspNetCore.HealthChecks.Kafka.
integración de Client
Para empezar a trabajar con la integración de .NET AspireApache Kafka, instale el 📦Aspire. Confluent.Kafka paquete NuGet en el proyecto client-consuming, es decir, el proyecto de la aplicación que usa el Apache Kafkaclient.
dotnet add package Aspire.Confluent.Kafka
Agregar productor de Kafka
En el archivo Program.cs del proyecto que consume client, llame al método de extensión AddKafkaProducer para registrar un IProducer<TKey, TValue>
para su uso a través del contenedor de inyección de dependencias. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a enviar al agente. Estos parámetros genéricos se usan en AddKafkaProducer
para crear una instancia de ProducerBuilder<TKey, TValue>
. Este método también toma el parámetro de nombre de conexión.
builder.AddKafkaProducer<string, string>("messaging");
A continuación, puede obtener la instancia IProducer<TKey, TValue>
mediante la inserción de dependencias. Por ejemplo, para recuperar el productor de un IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Para obtener más información sobre los trabajadores, consulte Servicios de trabajo en .NET.
Agregar un consumidor de Kafka
Para registrar un IConsumer<TKey, TValue>
para su uso a través del contenedor de inyección de dependencias, llame al método de extensión AddKafkaConsumer en el archivo Program.cs del proyecto que consume client. El método toma dos parámetros genéricos correspondientes al tipo de la clave y el tipo del mensaje que se va a recibir del agente. Estos parámetros genéricos se usan en AddKafkaConsumer
para crear una instancia de ConsumerBuilder<TKey, TValue>
. Este método también toma el parámetro de nombre de conexión.
builder.AddKafkaConsumer<string, string>("messaging");
A continuación, puede obtener la instancia IConsumer<TKey, TValue>
mediante la inserción de dependencias. Por ejemplo, para recuperar al consumidor de un IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Adición de productores o consumidores clave de Kafka
Puede haber situaciones en las que quiera registrar varias instancias de productor o consumidor con nombres de conexión diferentes. Para registrar productores o consumidores clave de Kafka, llame a la API adecuada.
- AddKeyedKafkaProducer: registra un productor de Kafka con una clave.
- AddKeyedKafkaConsumer: registra un consumidor de Kafka con clave.
Para obtener más información sobre los servicios con claves, consulte .NET inserción de dependencias: Servicios con claves.
Configuración
La integración de .NET AspireApache Kafka proporciona varias opciones para configurar la conexión en función de los requisitos y convenciones del proyecto.
Uso de una cadena de conexión
Al usar una cadena de conexión de la sección de configuración de ConnectionStrings
, puede proporcionar el nombre de la cadena de conexión al llamar a builder.AddKafkaProducer()
o builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
A continuación, la cadena de conexión se obtiene de la sección de configuración de ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
El valor de la cadena de conexión se establece en la propiedad BootstrapServers
de la instancia de IProducer<TKey, TValue>
o IConsumer<TKey, TValue>
generada. Para obtener más información, vea BootstrapServers.
Uso de proveedores de configuración
La integración de .NET AspireApache Kafka admite Microsoft.Extensions.Configuration. Carga el KafkaProducerSettings o KafkaConsumerSettings desde la configuración usando respectivamente las claves Aspire:Confluent:Kafka:Producer
y Aspire.Confluent:Kafka:Consumer
. El fragmento de código siguiente es un ejemplo de un archivo appsettings.json que configura algunas de las opciones:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Las propiedades Config
de las secciones de configuración de Aspire:Confluent:Kafka:Producer
y Aspire.Confluent:Kafka:Consumer
, respectivamente, se enlazan a instancias de ProducerConfig
y ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requiere que se establezca la propiedad ClientId
para permitir que el agente realice un seguimiento de los desplazamientos de mensajes consumidos.
Para el esquema completo de integración de Kafka clientJSON, consulte Aspire.Confluent.Kafka/ConfigurationSchema.json.
Usa delegados en línea
Hay varios delegados en línea disponibles para configurar diferentes opciones.
Configuración deKafkaProducerSettings
y KafkaConsumerSettings
Puede pasar el delegado Action<KafkaProducerSettings> configureSettings
para configurar algunas o todas las opciones en línea, por ejemplo, para deshabilitar las verificaciones de salud desde el código.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Puede configurar un consumidor en línea directamente en el código:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configuración de ProducerBuilder<TKey, TValue>
y ConsumerBuilder<TKey, TValue>
Para configurar los constructores de Confluent.Kafka
, pase un Action<ProducerBuilder<TKey, TValue>>
(o un Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Al registrar productores y consumidores, si necesita acceder a un servicio registrado en el contenedor DI, puede pasar un Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
o un Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
, respectivamente.
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Considere el siguiente ejemplo de registro de productor:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Para más información, consulte la documentación de la API de ProducerBuilder<TKey, TValue>
y la API de ConsumerBuilder<TKey, TValue>
.
verificaciones de estado de integración Client
De forma predeterminada, las integraciones de .NET.NET Aspire habilitan las comprobaciones de estado de para todos los servicios. Para obtener más información, consulte .NET.NET Aspire integrations overview.
La integración de .NET AspireApache Kafka maneja los siguientes escenarios de verificación de salud:
- Agrega la comprobación de estado
Aspire.Confluent.Kafka.Producer
cuando KafkaProducerSettings.DisableHealthChecks esfalse
. - Agrega la comprobación de estado
Aspire.Confluent.Kafka.Consumer
cuando KafkaConsumerSettings.DisableHealthChecks esfalse
. - Se integra con el punto de conexión HTTP de
/health
, que especifica que todas las comprobaciones de estado registradas deben pasar para que la aplicación se considere lista para aceptar el tráfico.
Observabilidad y telemetría
.NET .NET Aspire integraciones configuran automáticamente las configuraciones de registro, seguimiento y métricas, que a veces se conocen como los pilares de la observabilidad. Para obtener más información sobre la observabilidad de integración y la telemetría, consulte información general sobre las integraciones de .NET.NET Aspire. En función del servicio de respaldo, algunas integraciones solo pueden admitir algunas de estas características. Por ejemplo, algunas integraciones admiten el registro y el seguimiento, pero no las métricas. Las funciones de telemetría también se pueden deshabilitar mediante las técnicas presentadas en la sección de Configuración .
Registro
La integración de .NET AspireApache Kafka usa las siguientes categorías de registro:
Aspire.Confluent.Kafka
Rastreo
La integración de .NET AspireApache Kafka no emite trazas distribuidas.
Métricas
La integración de .NET AspireApache Kafka emite las métricas siguientes mediante OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
Consulte también
- Apache Kafka
- Confluent
- Documentación de Confluent Kafka .NETclient
- integraciones .NET.NET Aspire
- del repositorio de