다음을 통해 공유


.NET Aspire Apache Kafka 통합

포함:호스팅 통합Client 통합

Apache Kafka 오픈 소스 분산 이벤트 스트리밍 플랫폼입니다. 실시간 데이터 파이프라인 및 스트리밍 애플리케이션을 빌드하는 데 유용합니다. .NET Aspire Apache Kafka 통합을 사용하면 기존 Kafka 인스턴스에 연결하거나 docker.io/confluentinc/confluent-local사용하여 새 인스턴스를 만들 수 있습니다.

호스팅 통합

Apache Kafka 호스팅 통합은 Kafka server을 KafkaServerResource 유형으로 모델링합니다. 이 형식에 액세스하려면 📦Aspire.Hosting.Kafka NuGet 패키지를 앱 호스트 프로젝트에 설치한 후, 빌더와 함께 추가합니다.

dotnet add package Aspire.Hosting.Kafka

자세한 내용은 dotnet add package 또는 manage package dependencies in .NET applications.

Kafka server 리소스 추가

앱 호스트 프로젝트에서 AddKafka 인스턴스에서 builder 호출하여 Kafka server 리소스를 추가합니다.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka");

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

.NET .NET Aspire docker.io/confluentinc/confluent-local 이미지와 함께 이전 예제와 같이 앱 호스트에 컨테이너 이미지를 추가하는 경우 로컬 컴퓨터에 새 Kafka server 인스턴스를 만듭니다. Kafka server(kafka 변수)에 대한 참조가 ExampleProject에 추가됩니다. Kafka server 리소스에는 기본 포트가 포함됩니다.

WithReference 메서드는 ExampleProject라고 이름이 붙여진 "kafka"의 연결을 구성합니다. 자세한 내용은 컨테이너 리소스 수명 주기참조하세요.

조언

기존 Kafka server에 연결하려는 경우 대신 AddConnectionString을 호출하세요. 자세한 내용은 기존 리소스 참조를 참조하세요.

Kafka UI 추가

Kafka 리소스에 server 추가하려면 WithKafkaUI 메서드를 호출합니다.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI();

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI는 Apache Kafka 클러스터를 모니터링하고 관리하는 무료 오픈 소스 웹 UI입니다. .NET .NET Aspire Kafka UI를 실행하는 앱 호스트에 다른 컨테이너 이미지 docker.io/provectuslabs/kafka-ui 추가합니다.

Kafka UI 호스트 포트 변경

Kafka UI 호스트 포트를 변경하려면 WithHostPort 메서드에 대한 호출을 연결합니다.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithKafkaUI(kafkaUI => kafkaUI.WithHostPort(9100));

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

Kafka UI는 앞의 예제에서 http://localhost:9100에서 접근할 수 있습니다.

데이터 볼륨을 사용하여 Kafka server 리소스 추가

Kafka server 리소스에 데이터 볼륨을 추가하려면 Kafka WithDataVolume 리소스에서 server 메서드를 호출합니다.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataVolume(isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

데이터 볼륨은 Kafka server 데이터를 컨테이너의 수명 주기 외부에 유지하는 데 사용됩니다. 데이터 볼륨은 Kafka /var/lib/kafka/data 컨테이너의 server 경로에 탑재되고 name 매개 변수가 제공되지 않으면 이름이 임의로 생성됩니다. 데이터 볼륨에 대한 자세한 내용 및바인딩 탑재보다 선호하는 이유에 대한 자세한 내용은 문서: 볼륨참조하세요.

데이터 바인딩 마운트를 사용하여 Kafka server 리소스 추가

Kafka server 리소스에 데이터 바인딩 탑재를 추가하려면 WithDataBindMount 메서드를 호출합니다.

var builder = DistributedApplication.CreateBuilder(args);

var kafka = builder.AddKafka("kafka")
                   .WithDataBindMount(
                       source: @"C:\Kafka\Data",
                       isReadOnly: false);

builder.AddProject<Projects.ExampleProject>()
       .WithReference(kafka);

// After adding all resources, run the app...

중요하다

데이터 바인딩 탑재볼륨비해 기능이 제한되므로 성능, 이식성 및 보안이 향상되어 프로덕션 환경에 더 적합합니다. 그러나 바인딩 탑재를 사용하면 호스트 시스템의 파일에 직접 액세스하고 수정할 수 있으므로 실시간 변경이 필요한 개발 및 테스트에 적합합니다.

데이터 바인드 마운트는 호스트 컴퓨터의 파일 시스템을 사용하여 컨테이너가 재시작하더라도 Kafka server 데이터를 지속합니다. 데이터 바인딩 마운트는 호스트 머신의 Windows C:\Kafka\Data 경로(또는 /Kafka/Data경로의 Unix)에 있는 Kafka server 컨테이너에 마운트됩니다. 데이터 바인딩 마운트에 대한 자세한 내용은 Docker의 문서에서바인딩 마운트를 참고하십시오.

통합 상태 검사의 호스팅

Kafka 호스팅 통합은 Kafka server 리소스에 대한 상태 검사를 자동으로 추가합니다. 상태 검사는 지정된 연결 이름을 가진 Kafka 생산자가 Kafka server에 토픽을 연결하고 저장할 수 있는지를 확인합니다.

호스팅 통합은 📦 AspNetCore.HealthChecks.Kafka NuGet 패키지에 의존합니다.

Client 통합

통합을 시작하려면 -consuming 프로젝트, 즉 를 사용하는 애플리케이션에 대한 프로젝트에 NuGet 패키지인 Confluent.Kafka를 설치하세요. 을 설치하십시오.

dotnet add package Aspire.Confluent.Kafka

Kafka 생산자 추가

Program.cs사용 중인 프로젝트의 client 파일에서 AddKafkaProducer 확장 메서드를 호출하여, 종속성 주입 컨테이너를 통해 사용할 IProducer<TKey, TValue>를 등록합니다. 이 메서드는 broker에 보낼 키 형식과 메시지 형식에 해당하는 두 개의 제네릭 매개 변수를 사용합니다. 이러한 제네릭 매개 변수는 AddKafkaProducerProducerBuilder<TKey, TValue>인스턴스를 만드는 데 사용됩니다. 이 메서드는 연결 이름 매개 변수도 사용합니다.

builder.AddKafkaProducer<string, string>("messaging");

그런 다음 종속성 주입을 사용하여 IProducer<TKey, TValue> 인스턴스를 검색할 수 있습니다. 예를 들어, IHostedService에서 프로듀서를 검색하려면 다음 단계를 수행합니다.

internal sealed class Worker(IProducer<string, string> producer) : BackgroundService
{
    // Use producer...
}

작업자에 대한 자세한 내용은 Worker 서비스와 .NET를 참조하세요.

Kafka 소비자 추가

종속성 주입 컨테이너에서 IConsumer<TKey, TValue>을(를) 사용하도록 등록하려면, AddKafkaConsumer를 활용하는 프로젝트의 Program.cs 파일에서 client 확장 메서드를 호출하십시오. 메서드는 키의 형식과 broker에서 받을 메시지의 형식에 해당하는 두 개의 제네릭 매개 변수를 사용합니다. 이러한 제네릭 매개 변수는 AddKafkaConsumerConsumerBuilder<TKey, TValue>인스턴스를 만드는 데 사용됩니다. 이 메서드는 연결 이름 매개 변수도 사용합니다.

builder.AddKafkaConsumer<string, string>("messaging");

그런 다음 종속성 주입을 사용하여 IConsumer<TKey, TValue> 인스턴스를 검색할 수 있습니다. 예를 들어 IHostedService에서 소비자를 찾으려면 다음을 수행합니다.

internal sealed class Worker(IConsumer<string, string> consumer) : BackgroundService
{
    // Use consumer...
}

주요 Kafka 생산자 또는 소비자 추가

여러 생산자 또는 소비자 인스턴스를 서로 다른 연결 이름으로 등록하려는 경우가 있을 수 있습니다. 주요 Kafka 생산자 또는 소비자를 등록하려면 적절한 API를 호출합니다.

키드 서비스에 대한 자세한 내용은 .NET 종속성 주입: 키드 서비스를 참조하세요.

구성

.NET Aspire Apache Kafka 통합은 프로젝트의 요구 사항 및 규칙에 따라 연결을 구성하는 여러 옵션을 제공합니다.

연결 문자열 사용

ConnectionStrings 구성 섹션에서 연결 문자열을 사용하는 경우 builder.AddKafkaProducer() 또는 builder.AddKafkaProducer()호출할 때 연결 문자열의 이름을 제공할 수 있습니다.

builder.AddKafkaProducer<string, string>("kafka-producer");

그런 다음 ConnectionStrings 구성 섹션에서 연결 문자열을 검색합니다.

{
  "ConnectionStrings": {
    "kafka-producer": "broker:9092"
  }
}

연결 문자열 값은 생성된 BootstrapServers 또는 IProducer<TKey, TValue> 인스턴스의 IConsumer<TKey, TValue> 속성으로 설정됩니다. 자세한 내용은 BootstrapServers참조하세요.

구성 공급자 사용

.NET Aspire Apache Kafka 통합은 Microsoft.Extensions.Configuration지원합니다. KafkaProducerSettingsKafkaConsumerSettings 키를 각각 사용하여 구성에서 Aspire:Confluent:Kafka:Producer 또는 Aspire.Confluent:Kafka:Consumer 불러옵니다. 다음 코드 조각은 일부 옵션을 구성하는 appsettings.json 파일의 예입니다.

{
  "Aspire": {
    "Confluent": {
      "Kafka": {
        "Producer": {
          "DisableHealthChecks": false,
          "Config": {
            "Acks": "All"
          }
        }
      }
    }
  }
}

ConfigAspire:Confluent:Kafka:Producer 구성 섹션의 Aspire.Confluent:Kafka:Consumer 속성은 각각 ProducerConfigConsumerConfig인스턴스에 바인딩됩니다.

Confluent.Kafka.Consumer<TKey, TValue> 브로커가 메시지 오프셋을 추적할 수 있도록 ClientId 속성을 설정해야 합니다.

전체 Kafka client 통합 JSON 스키마는 Aspire참조하세요. Confluent.Kafka/ConfigurationSchema.json.

인라인 대리자를 사용하세요

다양한 옵션을 구성하는 데 사용할 수 있는 몇 가지 인라인 대리자가 있습니다.

KafkaProducerSettingsKafkaConsumerSettings을 구성하십시오.

Action<KafkaProducerSettings> configureSettings 델리게이트를 전달하여 일부 또는 모든 옵션을 인라인으로 설정하고, 예를 들어 코드에서 상태 검사를 비활성화할 수 있습니다.

builder.AddKafkaProducer<string, string>(
    "messaging", 
    static settings => settings.DisableHealthChecks = true);

코드에서 소비자를 인라인으로 구성할 수 있습니다.

builder.AddKafkaConsumer<string, string>(
    "messaging",
    static settings => settings.DisableHealthChecks = true);
ProducerBuilder<TKey, TValue> 구성 및 ConsumerBuilder<TKey, TValue> 구성

Confluent.Kafka 빌더를 구성하려면 Action<ProducerBuilder<TKey, TValue>>(또는 Action<ConsumerBuilder<TKey, TValue>>)를 전달합니다.

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static producerBuilder => 
    {
        var messageSerializer = new MyMessageSerializer();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

생산자와 소비자를 등록할 때 DI 컨테이너에 등록된 서비스에 액세스해야 하는 경우 각각 Action<IServiceProvider, ProducerBuilder<TKey, TValue>> 또는 Action<IServiceProvider, ConsumerBuilder<TKey, TValue>> 전달할 수 있습니다.

다음 생산자 등록 예제를 고려합니다.

builder.AddKafkaProducer<string, MyMessage>(
    "messaging",
    static (serviceProvider, producerBuilder) => 
    {
        var messageSerializer = serviceProvider.GetRequiredServices<MyMessageSerializer>();
        producerBuilder.SetValueSerializer(messageSerializer);
    })

자세한 내용은 ProducerBuilder<TKey, TValue>ConsumerBuilder<TKey, TValue> API 설명서를 참조하세요.

Client 통합 상태 검사

기본적으로 .NET.NET Aspire 통합은 모든 서비스에 대해 상태 검사를 활성화합니다. 자세한 내용은 .NET.NET Aspire 통합 개요참조하세요.

.NET Aspire Apache Kafka 통합은 다음과 같은 상태 검사 시나리오를 처리합니다.

  • Aspire.Confluent.Kafka.Producer 상태 검사를 KafkaProducerSettings.DisableHealthChecksfalse때 추가합니다.
  • Aspire.Confluent.Kafka.Consumer 상태 검사를 KafkaConsumerSettings.DisableHealthChecksfalse때 추가합니다.
  • /health HTTP 엔드포인트와 통합되어, 이 엔드포인트는 앱이 트래픽을 수락할 준비가 되었다고 간주되려면 등록된 모든 상태 검사가 통과해야 한다고 지정합니다.

관찰 가능성 및 원격 분석

통합은 로깅, 추적 및 메트릭 구성을 자동으로 설정하며, 이를 관찰성핵심 요소라고도 . 통합 관찰 가능성 및 원격 분석에 대한 자세한 내용은 .NET.NET Aspire 통합 개요참조하세요. 지원 서비스에 따라 일부 통합은 이러한 기능 중 일부만 지원할 수 있습니다. 예를 들어 일부 통합은 로깅 및 추적을 지원하지만 메트릭은 지원하지 않습니다. 구성 섹션에 제시된 기술을 사용하여 원격 분석 기능을 사용하지 않도록 설정할 수도 있습니다.

로깅

.NET Aspire Apache Kafka 통합은 다음 로그 범주를 사용합니다.

  • Aspire.Confluent.Kafka

추적

.NET Aspire Apache Kafka 통합은 분산 추적을 내보내지 않습니다.

지표

.NET Aspire Apache Kafka 통합은 OpenTelemetry사용하여 다음 메트릭을 내보낸다.

  • messaging.kafka.network.tx
  • messaging.kafka.network.transmitted
  • messaging.kafka.network.rx
  • messaging.kafka.network.received
  • messaging.publish.messages
  • messaging.kafka.message.transmitted
  • messaging.receive.messages
  • messaging.kafka.message.received

참고 항목