.NET Aspire Apache Kafka 통합
Apache Kafka 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다. 실시간 데이터 파이프라인 및 스트리밍 애플리케이션을 빌드하는 데 유용합니다.
.NET Aspire
Apache Kafka 통합을 사용하면 기존 Kafka 인스턴스에 연결하거나 docker.io/confluentinc/confluent-local
사용하여 새 인스턴스를 만들 수 있습니다.
호스팅 통합
Apache Kafka 호스팅 통합은 Kafka server을 KafkaServerResource 유형으로 모델링합니다. 이 형식에 액세스하려면 📦Aspire.Hosting.Kafka NuGet 패키지를 앱 호스트 프로젝트에 설치한 후, 빌더와 함께 추가합니다.
dotnet add package Aspire.Hosting.Kafka
자세한 내용은 dotnet add package 또는 manage package dependencies in .NET applications.
Kafka server 리소스 추가
앱 호스트 프로젝트에서 AddKafka 인스턴스에서 builder
호출하여 Kafka server 리소스를 추가합니다.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
.NET
.NET Aspire
docker.io/confluentinc/confluent-local
이미지와 함께 이전 예제와 같이 앱 호스트에 컨테이너 이미지를 추가하는 경우 로컬 컴퓨터에 새 Kafka server 인스턴스를 만듭니다. Kafka server(kafka
변수)에 대한 참조가 ExampleProject
에 추가됩니다. Kafka server 리소스에는 기본 포트가 포함됩니다.
WithReference 메서드는 ExampleProject
라고 이름이 붙여진 "kafka"
의 연결을 구성합니다. 자세한 내용은 컨테이너 리소스 수명 주기참조하세요.
조언
기존 Kafka server에 연결하려는 경우 대신 AddConnectionString을 호출하세요. 자세한 내용은 기존 리소스 참조를 참조하세요.
Kafka UI 추가
Kafka 리소스에 server 추가하려면 WithKafkaUI 메서드를 호출합니다.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI();
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI는 Apache Kafka 클러스터를 모니터링하고 관리하는 무료 오픈 소스 웹 UI입니다.
.NET
.NET Aspire Kafka UI를 실행하는 앱 호스트에 다른 컨테이너 이미지 docker.io/provectuslabs/kafka-ui
추가합니다.
Kafka UI 호스트 포트 변경
Kafka UI 호스트 포트를 변경하려면 WithHostPort 메서드에 대한 호출을 연결합니다.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
Kafka UI는 앞의 예제에서 http://localhost:9100
에서 접근할 수 있습니다.
데이터 볼륨을 사용하여 Kafka server 리소스 추가
Kafka server 리소스에 데이터 볼륨을 추가하려면 Kafka WithDataVolume 리소스에서 server 메서드를 호출합니다.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataVolume(isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
데이터 볼륨은 Kafka server 데이터를 컨테이너의 수명 주기 외부에 유지하는 데 사용됩니다. 데이터 볼륨은 Kafka /var/lib/kafka/data
컨테이너의 server 경로에 탑재되고 name
매개 변수가 제공되지 않으면 이름이 임의로 생성됩니다. 데이터 볼륨에 대한 자세한 내용 및
데이터 바인딩 마운트를 사용하여 Kafka server 리소스 추가
Kafka server 리소스에 데이터 바인딩 탑재를 추가하려면 WithDataBindMount 메서드를 호출합니다.
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka")
.WithDataBindMount(
source: @"C:\Kafka\Data",
isReadOnly: false);
builder.AddProject<Projects.ExampleProject>()
.WithReference(kafka);
// After adding all resources, run the app...
중요하다
데이터 바인딩 탑재볼륨비해 기능이 제한되므로 성능, 이식성 및 보안이 향상되어 프로덕션 환경에 더 적합합니다. 그러나 바인딩 탑재를 사용하면 호스트 시스템의 파일에 직접 액세스하고 수정할 수 있으므로 실시간 변경이 필요한 개발 및 테스트에 적합합니다.
데이터 바인드 마운트는 호스트 컴퓨터의 파일 시스템을 사용하여 컨테이너가 재시작하더라도 Kafka server 데이터를 지속합니다. 데이터 바인딩 마운트는 호스트 머신의 Windows C:\Kafka\Data
경로(또는 /Kafka/Data
경로의 Unix)에 있는 Kafka server 컨테이너에 마운트됩니다. 데이터 바인딩 마운트에 대한 자세한 내용은 Docker의 문서에서바인딩 마운트를 참고하십시오.
통합 상태 검사의 호스팅
Kafka 호스팅 통합은 Kafka server 리소스에 대한 상태 검사를 자동으로 추가합니다. 상태 검사는 지정된 연결 이름을 가진 Kafka 생산자가 Kafka server에 토픽을 연결하고 저장할 수 있는지를 확인합니다.
호스팅 통합은 📦 AspNetCore.HealthChecks.Kafka NuGet 패키지에 의존합니다.
Client 통합
dotnet add package Aspire.Confluent.Kafka
Kafka 생산자 추가
Program.cs사용 중인 프로젝트의 client 파일에서 AddKafkaProducer 확장 메서드를 호출하여, 종속성 주입 컨테이너를 통해 사용할 IProducer<TKey, TValue>
를 등록합니다. 이 메서드는 broker에 보낼 키 형식과 메시지 형식에 해당하는 두 개의 제네릭 매개 변수를 사용합니다. 이러한 제네릭 매개 변수는 AddKafkaProducer
ProducerBuilder<TKey, TValue>
인스턴스를 만드는 데 사용됩니다. 이 메서드는 연결 이름 매개 변수도 사용합니다.
builder.AddKafkaProducer<string, string>("messaging");
그런 다음 종속성 주입을 사용하여 IProducer<TKey, TValue>
인스턴스를 검색할 수 있습니다. 예를 들어, IHostedService
에서 프로듀서를 검색하려면 다음 단계를 수행합니다.
internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
작업자에 대한 자세한 내용은 Worker 서비스와 .NET를 참조하세요.
Kafka 소비자 추가
종속성 주입 컨테이너에서 IConsumer<TKey, TValue>
을(를) 사용하도록 등록하려면, AddKafkaConsumer를 활용하는 프로젝트의 Program.cs 파일에서 client 확장 메서드를 호출하십시오. 메서드는 키의 형식과 broker에서 받을 메시지의 형식에 해당하는 두 개의 제네릭 매개 변수를 사용합니다. 이러한 제네릭 매개 변수는 AddKafkaConsumer
ConsumerBuilder<TKey, TValue>
인스턴스를 만드는 데 사용됩니다. 이 메서드는 연결 이름 매개 변수도 사용합니다.
builder.AddKafkaConsumer<string, string>("messaging");
그런 다음 종속성 주입을 사용하여 IConsumer<TKey, TValue>
인스턴스를 검색할 수 있습니다. 예를 들어 IHostedService
에서 소비자를 찾으려면 다음을 수행합니다.
internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
// Use consumer...
}
주요 Kafka 생산자 또는 소비자 추가
여러 생산자 또는 소비자 인스턴스를 서로 다른 연결 이름으로 등록하려는 경우가 있을 수 있습니다. 주요 Kafka 생산자 또는 소비자를 등록하려면 적절한 API를 호출합니다.
- AddKeyedKafkaProducer: 키 지정 Kafka 생산자를 등록합니다.
- AddKeyedKafkaConsumer: 키가 지정된 Kafka 컨슈머를 등록합니다.
키드 서비스에 대한 자세한 내용은 .NET 종속성 주입: 키드 서비스를 참조하세요.
구성
.NET Aspire Apache Kafka 통합은 프로젝트의 요구 사항 및 규칙에 따라 연결을 구성하는 여러 옵션을 제공합니다.
연결 문자열 사용
ConnectionStrings
구성 섹션에서 연결 문자열을 사용하는 경우 builder.AddKafkaProducer()
또는 builder.AddKafkaProducer()
호출할 때 연결 문자열의 이름을 제공할 수 있습니다.
builder.AddKafkaProducer<string, string>("kafka-producer");
그런 다음 ConnectionStrings
구성 섹션에서 연결 문자열을 검색합니다.
{
"ConnectionStrings": {
"kafka-producer": "broker:9092"
}
}
연결 문자열 값은 생성된 BootstrapServers
또는 IProducer<TKey, TValue>
인스턴스의 IConsumer<TKey, TValue>
속성으로 설정됩니다. 자세한 내용은 BootstrapServers참조하세요.
구성 공급자 사용
.NET Aspire
Apache Kafka 통합은 Microsoft.Extensions.Configuration지원합니다.
KafkaProducerSettings 및 KafkaConsumerSettings 키를 각각 사용하여 구성에서 Aspire:Confluent:Kafka:Producer
또는 Aspire.Confluent:Kafka:Consumer
불러옵니다. 다음 코드 조각은 일부 옵션을 구성하는 appsettings.json 파일의 예입니다.
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
Config
및 Aspire:Confluent:Kafka:Producer
구성 섹션의 Aspire.Confluent:Kafka:Consumer
속성은 각각 ProducerConfig
및 ConsumerConfig
인스턴스에 바인딩됩니다.
Confluent.Kafka.Consumer<TKey, TValue>
브로커가 메시지 오프셋을 추적할 수 있도록 ClientId
속성을 설정해야 합니다.
전체 Kafka client 통합 JSON 스키마는 Aspire참조하세요. Confluent.Kafka/ConfigurationSchema.json.
인라인 대리자를 사용하세요
다양한 옵션을 구성하는 데 사용할 수 있는 몇 가지 인라인 대리자가 있습니다.
KafkaProducerSettings
및 KafkaConsumerSettings
을 구성하십시오.
Action<KafkaProducerSettings> configureSettings
델리게이트를 전달하여 일부 또는 모든 옵션을 인라인으로 설정하고, 예를 들어 코드에서 상태 검사를 비활성화할 수 있습니다.
builder.AddKafkaProducer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
코드에서 소비자를 인라인으로 구성할 수 있습니다.
builder.AddKafkaConsumer<string, string>(
"messaging",
static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue>
구성 및 ConsumerBuilder<TKey, TValue>
구성
Confluent.Kafka
빌더를 구성하려면 Action<ProducerBuilder<TKey, TValue>>
(또는 Action<ConsumerBuilder<TKey, TValue>>
)를 전달합니다.
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static producerBuilder =>
{
var messageSerializer = new MyMessageSerializer();
producerBuilder.SetValueSerializer(messageSerializer);
})
생산자와 소비자를 등록할 때 DI 컨테이너에 등록된 서비스에 액세스해야 하는 경우 각각 Action<IServiceProvider, ProducerBuilder<TKey, TValue>>
또는 Action<IServiceProvider, ConsumerBuilder<TKey, TValue>>
전달할 수 있습니다.
- AddKafkaProducer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ProducerBuilder<TKey,TValue>>)
- AddKafkaConsumer<TKey,TValue>(IHostApplicationBuilder, String, Action<IServiceProvider,ConsumerBuilder<TKey,TValue>>)
다음 생산자 등록 예제를 고려합니다.
builder.AddKafkaProducer<string, MyMessage>(
"messaging",
static (serviceProvider, producerBuilder) =>
{
var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
producerBuilder.SetValueSerializer(messageSerializer);
})
자세한 내용은 ProducerBuilder<TKey, TValue>
및 ConsumerBuilder<TKey, TValue>
API 설명서를 참조하세요.
Client 통합 상태 검사
기본적으로 .NET.NET Aspire 통합은 모든 서비스에 대해 상태 검사를 활성화합니다. 자세한 내용은 .NET.NET Aspire 통합 개요참조하세요.
.NET Aspire Apache Kafka 통합은 다음과 같은 상태 검사 시나리오를 처리합니다.
-
Aspire.Confluent.Kafka.Producer
상태 검사를 KafkaProducerSettings.DisableHealthChecksfalse
때 추가합니다. -
Aspire.Confluent.Kafka.Consumer
상태 검사를 KafkaConsumerSettings.DisableHealthChecksfalse
때 추가합니다. -
/health
HTTP 엔드포인트와 통합되어, 이 엔드포인트는 앱이 트래픽을 수락할 준비가 되었다고 간주되려면 등록된 모든 상태 검사가 통과해야 한다고 지정합니다.
관찰 가능성 및 원격 분석
로깅
.NET Aspire Apache Kafka 통합은 다음 로그 범주를 사용합니다.
Aspire.Confluent.Kafka
추적
.NET Aspire Apache Kafka 통합은 분산 추적을 내보내지 않습니다.
지표
.NET Aspire Apache Kafka 통합은 OpenTelemetry사용하여 다음 메트릭을 내보낸다.
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
참고 항목
.NET Aspire