integrazione di .NET AspireApache Kafka
Include:integrazione dell'hosting e Client integrazione
Apache Kafka è una piattaforma di streaming di eventi distribuiti open source. È utile per la creazione di pipeline di dati in tempo reale e applicazioni di streaming. L'integrazione .NET AspireApache Kafka consente di connettersi alle istanze Kafka esistenti o di creare nuove istanze da .NET con l'immagine del contenitore docker.io/confluentinc/confluent-local
.
Integrazione dell'hosting
Il Apache Kafka che ospita l'integrazione modella un server Kafka come tipo di KafkaServerResource. Per accedere a questo tipo, installare il pacchetto NuGet 📦Aspire.Hosting.Kafka nel progetto host dell'app , quindi aggiungerlo con il builder.
- .NET interfaccia della riga di comando
- PackageReference
dotnet add package Aspire.Hosting.Kafka
Per altre informazioni, vedere dotnet add package o Manage package dependencies in .NET applications.
Aggiungere una risorsa di server Kafka
Chiama AddKafka nell'istanza di builder
nel progetto host dell'app, per aggiungere una risorsa Kafka server:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Quando .NET.NET Aspire aggiunge un'immagine contenitore all'host dell'app, come illustrato nell'esempio precedente con l'immagine docker.io/confluentinc/confluent-local
, crea una nuova istanza di Kafka server nel computer locale. Un riferimento al tuo Kafka server (la variabile kafka
) è stato aggiunto in ExampleProject
. La risorsa Kafka server include le porte predefinite.
Il metodo WithReference configura una connessione nel ExampleProject
denominato "kafka"
. Per ulteriori informazioni, consultare ciclo di vita delle risorse container.
Consiglio
Se preferisci connetterti a un Kafka esistente server, chiama invece AddConnectionString. Per altre informazioni, vedere Fare riferimento alle risorse esistenti.
Aggiungere l'interfaccia utente di Kafka
Per aggiungere la dell'interfaccia utente kafka
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L'interfaccia utente kafka è un'interfaccia utente Web gratuita e open source per monitorare e gestire i cluster Apache Kafka.
.NET
.NET Aspire aggiunge un'altra immagine del contenitore docker.io/provectuslabs/kafka-ui
all'host dell'app che esegue l'interfaccia utente Kafka.
Modificare la porta host dell'interfaccia utente Kafka
Per modificare la porta host dell'interfaccia utente Kafka, concatenare una chiamata al metodo WithHostPort:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
L'interfaccia utente di Kafka è accessibile in http://localhost:9100
nell'esempio precedente.
Aggiungi la risorsa Kafka server con volume di dati
Per aggiungere un volume di dati alla risorsa server Kafka, chiamare il metodo WithDataVolume nella risorsa server Kafka:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Il volume di dati viene usato per mantenere i dati Kafka server fuori dal ciclo di vita del proprio contenitore. Il volume di dati viene montato nel percorso /var/lib/kafka/data
all'interno del contenitore server di Kafka e, quando non viene specificato il parametro name
, il nome viene generato in modo casuale. Per ulteriori informazioni sui volumi di dati e sui motivi per cui sono preferiti rispetto ai bind mount , consultare la documentazione Docker: Volumi.
Aggiungere una risorsa Kafka server con bind mount dei dati
Per aggiungere un montaggio di associazione dati alla risorsa di server Kafka, chiamare il metodo WithDataBindMount:
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Importante
I di binding dei dati hanno funzionalità limitate rispetto ai volumi , che offrono prestazioni, portabilità e sicurezza migliori, rendendole più adatte per gli ambienti di produzione. Tuttavia, i bind mounts consentono l'accesso e la modifica diretti dei file nel sistema host, ideale per lo sviluppo e il test quando sono necessarie modifiche in tempo reale.
I montaggi di bind dei dati si basano sul file system della macchina host per garantire la persistenza dei dati di Kafka server tra i riavvii del contenitore. Il mount di dati è montato nel percorso C:\Kafka\Data
su Windows (o /Kafka/Data
su Unix) sul computer host nel contenitore Kafka server. Per altre informazioni sui montaggi di associazione dati, vedere Docker docs: Bind mounts.
Hosting dei controlli di integrità dell'integrazione
L'integrazione dell'hosting Kafka aggiunge automaticamente un controllo di integrità per la risorsa Kafka server. Il controllo di integrità verifica che un producer Kafka con il nome di connessione specificato sia in grado di connettersi e memorizzare un topic su Kafka server.
L'integrazione dell'hosting si basa sul pacchetto NuGet 📦 AspNetCore.HealthChecks.Kafka.
Client integrazione
Per iniziare a usare l'integrazione di .NET AspireApache Kafka, installare il pacchetto NuGet 📦Aspire.Confluent.Kafka nel progetto consumer client, cioè il progetto per l'applicazione che usa il Apache Kafkaclient.
- .NET interfaccia della riga di comando
- PackageReference
dotnet add package Aspire.Confluent.Kafka
Aggiungere il producer Kafka
Nel file Program.cs del tuo progetto clienta consumo, chiama il metodo di estensione AddKafkaProducer per registrare un IProducer<TKey, TValue>
da utilizzare tramite il contenitore di iniezione delle dipendenze. Il metodo accetta due parametri generici corrispondenti al tipo della chiave e al tipo del messaggio da inviare al broker. Questi parametri generici vengono usati da AddKafkaProducer
per creare un'istanza di ProducerBuilder<TKey, TValue>
. Questo metodo accetta anche il parametro del nome della connessione.
builder.AddKafkaProducer<string, string>("messaging");
È quindi possibile recuperare l'istanza di IProducer<TKey, TValue>
usando l'iniezione delle dipendenze. Ad esempio, per recuperare il producer da un IHostedService
:
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
Per ulteriori informazioni sui lavoratori, vedere Servizi per i lavoratori in .NET.
Aggiungere un consumer Kafka
Per registrare un IConsumer<TKey, TValue>
da utilizzare tramite il contenitore per l'inserimento delle dipendenze, è necessario chiamare il metodo di estensione AddKafkaConsumer nel file Program.cs del progetto che utilizza client. Il metodo accetta due parametri generici corrispondenti al tipo della chiave e al tipo del messaggio da ricevere dal broker. Questi parametri generici vengono usati da AddKafkaConsumer
per creare un'istanza di ConsumerBuilder<TKey, TValue>
. Questo metodo accetta anche il parametro del nome della connessione.
builder.AddKafkaConsumer<string, string>("messaging");
È quindi possibile recuperare l'istanza di IConsumer<TKey, TValue>
usando l'iniezione delle dipendenze. Ad esempio, per recuperare l'utente da un IHostedService
:
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
Aggiungere i producer o consumer Kafka con chiave
In alcuni casi potrebbe essere necessario registrare più istanze di producer o consumer con nomi di connessione diversi. Per registrare i producer o consumer Kafka con chiavi, chiamare l'API appropriata.
- AddKeyedKafkaProducer: registra un producer Kafka con chiave.
- AddKeyedKafkaConsumer: registra un consumer Kafka con chiave.
Per altre informazioni sui servizi con chiave, vedere .NET iniezione delle dipendenze: Servizi con chiave.
Configurazione
L'integrazione .NET AspireApache Kafka offre più opzioni per configurare la connessione in base ai requisiti e alle convenzioni del progetto.
Usare una stringa di connessione
Quando si usa una stringa di connessione dalla sezione di configurazione ConnectionStrings
, è possibile specificare il nome della stringa di connessione quando si chiama builder.AddKafkaProducer()
o builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("kafka-producer");
La stringa di connessione viene quindi recuperata dalla sezione di configurazione ConnectionStrings
:
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
Il valore della stringa di connessione viene impostato sulla proprietà BootstrapServers
dell'istanza di IProducer<TKey, TValue>
o IConsumer<TKey, TValue>
generata. Per altre informazioni, vedere BootstrapServers.
Utilizzare i provider di configurazione
L'integrazione .NET AspireApache Kafka supporta Microsoft.Extensions.Configuration. Carica il KafkaProducerSettings o il KafkaConsumerSettings dalla configurazione usando rispettivamente le chiavi Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
. Il frammento di codice seguente è un esempio di un file appsettings.json che configura alcune delle opzioni:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Le proprietà Config
di entrambe le sezioni di configurazione Aspire:Confluent:Kafka:Producer
e Aspire.Confluent:Kafka:Consumer
si associano rispettivamente alle istanze di ProducerConfig
e ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
richiede che la proprietà ClientId
sia impostata per consentire al broker di tenere traccia degli offset dei messaggi utilizzati.
Per lo schema completo di integrazione Kafka clientJSON, vedere Aspire.Confluent.Kafka/ConfigurationSchema.json.
Usare delegati inline
Sono disponibili diversi delegati inline per configurare varie opzioni.
ConfigurareKafkaProducerSettings
e KafkaConsumerSettings
È possibile passare il delegato Action<KafkaProducerSettings> configureSettings
per configurare alcune o tutte le opzioni inline, ad esempio per disabilitare i controlli di integrità dal codice:
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
È possibile configurare un consumer in-line dal codice:
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
Configurare ProducerBuilder<TKey, TValue>
e ConsumerBuilder<TKey, TValue>
Per configurare i generatori di Confluent.Kafka
, passare un Action<ProducerBuilder<TKey, TValue>>
(o Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
Quando si registrano i producer e i consumer, se è necessario accedere a un servizio registrato nel contenitore DI, è possibile passare rispettivamente un Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
o Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
:
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
Ecco un esempio di registrazione del seguente produttore:
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
Per altre informazioni, vedere ProducerBuilder<TKey, TValue>
e documentazione dell'API ConsumerBuilder<TKey, TValue>
.
Client verifiche di integrità dell'integrazione
Per impostazione predefinita, le integrazioni di .NET.NET Aspire abilitano verifiche di integrità per tutti i servizi. Per altre informazioni, vedere panoramica delle integrazioni .NET.NET Aspire.
L'integrazione .NET AspireApache Kafka gestisce i seguenti scenari di verifica dello stato di salute:
- Aggiunge il controllo integrità
Aspire.Confluent.Kafka.Producer
quando KafkaProducerSettings.DisableHealthChecks èfalse
. - Aggiunge il controllo integrità
Aspire.Confluent.Kafka.Consumer
quando KafkaConsumerSettings.DisableHealthChecks èfalse
. - Si integra con l'endpoint HTTP
/health
, il quale specifica che tutti i controlli di integrità registrati devono essere superati con successo affinché l'app sia considerata pronta per accettare il traffico.
Osservabilità e telemetria
.NET
.NET Aspire le integrazioni configurano automaticamente registrazione, tracciamento e metriche, talvolta noti come i pilastri dell'osservabilità. Per altre informazioni sull'osservabilità e la telemetria dell'integrazione, vedere panoramica delle integrazioni .NET.NET Aspire. A seconda del servizio di backup, alcune integrazioni possono supportare solo alcune di queste funzionalità. Ad esempio, alcune integrazioni supportano la registrazione e la traccia, ma non le metriche. Le funzionalità di telemetria possono essere disabilitate anche usando le tecniche presentate nella sezione Configurazione
Registrazione
L'integrazione .NET AspireApache Kafka usa le categorie di log seguenti:
Aspire.Confluent.Kafka
Tracciatura
L'integrazione .NET AspireApache Kafka non genera tracce distribuite.
Metriche
L'integrazione .NET AspireApache Kafka genera le metriche seguenti usando OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received