Event Hubs .NET SDK'larını (AMQP) kullanarak olay akışı yaparken Avro şeması kullanarak doğrulama
Bu hızlı başlangıçta, Azure.Messaging.EventHubs .NET kitaplığını kullanarak şema doğrulaması ile olay hub'ına olay göndermeyi ve olay hub'ından olay almayı öğreneceksiniz.
Not
Azure Schema Registry , olay odaklı ve mesajlaşma odaklı uygulamalar için şemalar için merkezi bir depo sağlayan Event Hubs'ın bir özelliğidir. Üretici ve tüketici uygulamalarınızın şemayı yönetmek ve paylaşmak zorunda kalmadan veri alışverişi yapma esnekliği sağlar. Ayrıca, yeniden kullanılabilir şemalar için basit bir idare çerçevesi sağlar ve bir gruplandırma yapısı (şema grupları) aracılığıyla şemalar arasındaki ilişkiyi tanımlar. Daha fazla bilgi için bkz. Event Hubs'ta Azure Schema Registry.
Önkoşullar
Azure Event Hubs yeniyseniz, bu hızlı başlangıcı yapmadan önce bkz. Event Hubs'a genel bakış.
Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:
- Azure aboneliğiniz yoksa başlamadan önce ücretsiz bir hesap oluşturun.
-
Microsoft Visual Studio 2022.
Azure Event Hubs istemci kitaplığı, C# 8.0'da tanıtılan yeni özellikleri kullanır. Kitaplığı önceki C# dil sürümleriyle kullanmaya devam edebilirsiniz, ancak yeni söz dizimi kullanılamaz. Tam söz dizimini kullanmak için .NET Core SDK 3.0 veya üzeri ile derlemenizi ve dil sürümünün olarak ayarlanmasını
latest
öneririz. Visual Studio kullanıyorsanız, Visual Studio 2019'un önceki sürümleri C# 8.0 projeleri oluşturmak için gereken araçlarla uyumlu değildir. Ücretsiz Community sürümü de dahil olmak üzere Visual Studio 2019 buradan indirilebilir.
Olay hub’ı oluşturma
Event Hubs ad alanı ve olay hub'ı oluşturmak için hızlı başlangıçtaki yönergeleri izleyin: Event Hubs ad alanı ve olay hub'ı oluşturma. Ardından, Event Hubs ad alanınıza bağlantı dizesi almak için Bağlantı dizesini alma başlığındaki yönergeleri izleyin.
Geçerli hızlı başlangıçta kullanacağınız aşağıdaki ayarları not edin:
- Event Hubs ad alanı için bağlantı dizesi
- Olay hub'ının adı
Şema oluşturma
Şema grubu ve şema oluşturmak için Şema Kayıt Defteri'ni kullanarak şema oluşturma başlığındaki yönergeleri izleyin.
Şema Kayıt Defteri portalını kullanarak contoso-sg adlı bir şema grubu oluşturun. Uyumluluk modu için serileştirme türü olarak Avro ve Yok kullanın.
Bu şema grubunda, aşağıdaki şema içeriğini kullanarak şema adıyla
Microsoft.Azure.Data.SchemaRegistry.example.Order
yeni bir Avro şeması oluşturun.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Şema Kayıt Defteri Okuyucusu rolüne kullanıcı ekleme
Kullanıcı hesabınızı ad alanı düzeyinde Şema Kayıt Defteri Okuyucusu rolüne ekleyin. Şema Kayıt Defteri Katkıda Bulunanı rolünü de kullanabilirsiniz, ancak bu hızlı başlangıç için gerekli değildir.
- Event Hubs Ad Alanı sayfasında, soldaki menüden Erişim denetimi (IAM) öğesini seçin.
- Erişim denetimi (IAM) sayfasında menüde + Ekle ->Rol ataması ekle'yi seçin.
- Atama türü sayfasında İleri'yi seçin.
- Roller sayfasında Şema Kayıt Defteri Okuyucusu (Önizleme) öğesini ve ardından sayfanın alt kısmındaki İleri'yi seçin.
- + Üye seç bağlantısını kullanarak kullanıcı hesabınızı role ekleyin ve ardından İleri'yi seçin.
- Gözden geçir ve ata sayfasında Gözden geçir ve ata'yı seçin.
Şema doğrulaması ile olay hub'larına olay oluşturma
Olay üreticisi için konsol uygulaması oluşturma
- Visual Studio 2019’u başlatın.
- Yeni proje oluştur'u seçin.
-
Yeni proje oluştur iletişim kutusunda aşağıdaki adımları uygulayın: Bu iletişim kutusunu görmüyorsanız, menüden Dosya'yı seçin, Yeni'yi ve ardından Proje'yi seçin.
Programlama dili için C# öğesini seçin.
Uygulamanın türü için Konsol'a tıklayın.
Sonuçlar listesinden Konsol Uygulaması'nı seçin.
Ardından İleri'yi seçin.
- Proje adı olarak OrderProducer , çözüm adı olarak SRQuickStart yazın ve projeyi oluşturmak için Tamam'ı seçin.
Event Hubs NuGet paketini ekleme
Menüden Araçlar>NuGet Paket Yöneticisi Paket Yöneticisi>Konsolu'nu seçin.
Azure.Messaging.EventHubs ve diğer NuGet paketlerini yüklemek için aşağıdaki komutları çalıştırın. Son komutu çalıştırmak için ENTER tuşuna basın.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Burada gösterildiği gibi Visual Studio aracılığıyla Azure'a bağlanmak için üretici uygulamalarının kimliğini doğrulayın.
Ad alanı düzeyinde rolün üyesi
Schema Registry Reader
olan kullanıcı hesabını kullanarak Azure'da oturum açın. Şema kayıt defteri rolleri hakkında bilgi için bkz. Event Hubs'ta Azure Schema Registry.
Avro şemasını kullanarak kod oluşturma
- adlı
Order.avsc
bir dosya oluşturmak için şemayı oluşturmak için kullandığınız içeriği kullanın. Dosyayı proje veya çözüm klasörüne kaydedin. - Ardından bu şema dosyasını kullanarak .NET için kod oluşturabilirsiniz. Kod oluşturma için avrogen gibi herhangi bir dış kod oluşturma aracını kullanabilirsiniz. Örneğin kod oluşturmak için komutunu çalıştırabilirsiniz
avrogen -s .\Order.avsc .
. - Kod oluşturduktan sonra klasöründe adlı
Order.cs
\Microsoft\Azure\Data\SchemaRegistry\example
dosyayı görürsünüz. Yukarıdaki Avro şeması için ad alanındaMicrosoft.Azure.Data.SchemaRegistry.example
C# türlerini oluşturur. -
Order.cs
DosyayıOrderProducer
projeye ekleyin.
Olayları seri hale getirmek ve olay hub'ına göndermek için kod yazma
Program.cs
dosyasına aşağıdaki kodu ekleyin. Ayrıntılar için kod açıklamalarına bakın. Koddaki üst düzey adımlar şunlardır:- Olay hub'ına olay göndermek için kullanabileceğiniz bir üretici istemcisi oluşturun.
- Bir nesnedeki verileri seri hale getirmek ve doğrulamak için kullanabileceğiniz bir
Order
şema kayıt defteri istemcisi oluşturun. - Oluşturulan
Order
türü kullanarak yeniOrder
bir nesne oluşturun. - nesnesini olarak serileştirmek için şema kayıt defteri istemcisini
Order
EventData
kullanın. - Bir toplu olay oluşturun.
- Olay verilerini olay toplu işlemine ekleyin.
- Olay toplu işlemini olay hub'ına göndermek için üretici istemcisini kullanın.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
- Event Hubs ad alanı için bağlantı dizesi -
EVENTHUBNAME
- olay hub'ının adı -
EVENTHUBSNAMESPACENAME
- Event Hubs ad alanının adı -
SCHEMAGROUPNAME
- şema grubunun adı
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
Projeyi derleyin ve hata olmadığından emin olun.
Programı çalıştırın ve onay iletisini bekleyin.
A batch of 1 order has been published.
Azure portal olay hub'ında olayları aldığını doğrulayabilirsiniz. Ölçümler bölümünde İletiler görünümüne geçin. Grafiği güncelleştirmek için sayfayı yenileyin. İletilerin alındığını göstermesi birkaç saniye sürebilir.
Şema doğrulaması ile olay hub'larından olayları kullanma
Bu bölümde, olay hub'ından olay alan ve olay verilerini seri durumdan kaldırmak için şema kayıt defterini kullanan bir .NET Core konsol uygulamasının nasıl yazıldığını gösterir.
Ek önkoşullar
- Olay işlemcisini kullanmak için depolama hesabını oluşturun.
Tüketici uygulaması oluşturma
- Çözüm Gezgini penceresinde SRQuickStart çözümüne sağ tıklayın, Ekle'nin üzerine gelin ve Yeni Proje'yi seçin.
- Konsol uygulaması'nın ardından İleri'yi seçin.
- Proje adı olarak OrderConsumer girin ve Oluştur'u seçin.
- Çözüm Gezgini penceresinde OrderConsumer'a sağ tıklayın ve Başlangıç Projesi Olarak Ayarla'yı seçin.
Event Hubs NuGet paketini ekleme
Menüden Araçlar>NuGet Paket Yöneticisi Paket Yöneticisi>Konsolu'nu seçin.
Paket Yöneticisi Konsolu penceresinde, Varsayılan proje için OrderConsumer'ın seçildiğini onaylayın. Aksi takdirde, OrderConsumer'ı seçmek için açılan listeyi kullanın.
Gerekli NuGet paketlerini yüklemek için aşağıdaki komutu çalıştırın. Son komutu çalıştırmak için ENTER tuşuna basın.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Burada gösterildiği gibi Visual Studio aracılığıyla Azure'a bağlanmak için üretici uygulamalarının kimliğini doğrulayın.
Ad alanı düzeyinde rolün üyesi
Schema Registry Reader
olan kullanıcı hesabını kullanarak Azure'da oturum açın. Şema kayıt defteri rolleri hakkında bilgi için bkz. Event Hubs'ta Azure Schema Registry.Order.cs
Üretici uygulamasını oluşturmanın bir parçası olarak oluşturduğunuz dosyayı OrderConsumer projesine ekleyin.OrderConsumer projesine sağ tıklayın ve Başlangıç projesi olarak ayarla'yı seçin.
Schema Registry kullanarak olayları almak ve seri durumdan çıkarabilmek için kod yazma
Program.cs
dosyasına aşağıdaki kodu ekleyin. Ayrıntılar için kod açıklamalarına bakın. Koddaki üst düzey adımlar şunlardır:- Olay hub'ına olay göndermek için kullanabileceğiniz bir tüketici istemcisi oluşturun.
- Azure blob depolama alanındaki blob kapsayıcısı için bir blob kapsayıcısı istemcisi oluşturun.
- Bir olay işlemcisi istemcisi oluşturun ve olay ve hata işleyicilerini kaydedin.
- Olay işleyicisinde, olay verilerini bir nesnede seri durumdan çıkarmak için kullanabileceğiniz bir
Order
şema kayıt defteri istemcisi oluşturun. - Seri hale getiriciyi kullanarak olay verilerini bir
Order
nesnede seri durumdan çıkarma. - Alınan sipariş hakkındaki bilgileri yazdırın.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- Event Hubs ad alanı için bağlantı dizesi -
EVENTHUBNAME
- olay hub'ının adı -
EVENTHUBSNAMESPACENAME
- Event Hubs ad alanının adı -
SCHEMAGROUPNAME
- şema grubunun adı -
AZURESTORAGECONNECTIONSTRING
- Azure depolama hesabı için bağlantı dizesi -
BLOBCONTAINERNAME
- Blob kapsayıcısının adı
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
Projeyi derleyin ve hata olmadığından emin olun.
Alıcı uygulamasını çalıştırın.
Olayların alındığını belirten bir ileti görmeniz gerekir.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Bu olaylar, gönderen programını çalıştırarak daha önce olay hub'ına gönderdiğiniz üç olaydır.
Örnekler
GitHub depomuzdaki Benioku makalesine bakın.
Kaynakları temizleme
Event Hubs ad alanını silin veya ad alanını içeren kaynak grubunu silin.
Sonraki adımlar
.NET için Azure Schema Registry istemci kitaplığını kullanıma alma