配信不能キューを使用したメッセージ転送エラー処理
キューに置かれたメッセージは、配信に失敗する可能性があります。配信に失敗したメッセージは、配信不能キューに記録されます。配信の失敗は、ネットワーク エラー、キューが削除されている、キューがいっぱいになっている、認証エラー、配信が時間どおりに行われなかったなど、さまざまな理由で生じる可能性があります。
キューに置かれたメッセージは、受信側のアプリケーションでタイムリーに読み取られないと、長時間キューに残ることがあります。時間依存のメッセージでは、このような動作が適切でない場合があります。時間依存のメッセージでは、メッセージをキューに格納しておくことができる期間を示す TTL (Time to Live) プロパティが、キューに置かれたバインディングに設定されおり、この期間を超えると、メッセージの有効期限が切れます。期限切れのメッセージは、配信不能キューと呼ばれる特別なキューに送信されます。また、キューのクォータの超過、認証エラーなどの理由で、メッセージが配信不能キューに置かれる場合もあります。
一般に、アプリケーションには、配信不能キューのメッセージとエラーの理由を読み取るための補正ロジックが記述されています。補正ロジックはエラーの原因に依存します。たとえば、認証エラーの場合は、メッセージに添付された証明書を修正し、メッセージを再送信できます。また、ターゲット キューのクォータに達したために配信が失敗した場合は、クォータの問題が解決されていることを期待して配信を再試行できます。
ほとんどのキュー システムには、そのシステムから配信できなかったすべてのメッセージを格納するための、システム全体の配信不能キューがあります。メッセージ キュー (MSMQ) には、システム全体の配信不能キューが 2 種類用意されています。1 つはトランザクション キューへの配信に失敗したメッセージを格納するトランザクション システム全体の配信不能キューで、もう 1 つは非トランザクション キューへの配信に失敗したメッセージを格納する非トランザクション システム全体の配信不能キューです。2 つのクライアントが 2 つの異なるサービスにメッセージを送信しているために、WCF の異なるキューで、送信に同じ MSMQ サービスを共有している場合は、システム配信不能キューでメッセージが混在する可能性があります。これは、必ずしも最適であるとは言えません。場合によっては (たとえば、セキュリティ上の理由で)、一方のクライアントが、もう一方のクライアントのメッセージを配信不能キューから読み取ることができないようにする必要があるからです。また、共有された配信不能キューでは、クライアントがキューを参照して、送信したメッセージを検索する必要もありますが、配信不能キューに置かれているメッセージの数によっては、これは極めて大きな負荷になる可能性があります。そのため、Windows Vista の WCF NetMsmqBinding、MsmqIntegrationBinding、および MSMQ には、カスタム配信不能キュー (アプリケーション固有の配信不能キューと呼ばれることもあります) が用意されています。
カスタム配信不能キューでは、同じ MSMQ サービスを共有してメッセージを送信するクライアントをそれぞれ分離できます。
Windows Communication Foundation (WCF) の場合、Windows Server 2003 と Windows XP では、キューに置かれたすべてのクライアント アプリケーションに対して、システム全体で 1 つの配信不能キューが提供されます。一方、Windows Vista では、Windows Communication Foundation (WCF) により、キューに置かれたクライアント アプリケーションごとに配信不能キューが提供されます。
配信不能キューの使用の指定
配信不能キューは、送信元アプリケーションのキュー マネージャーに存在します。ここには、有効期限が切れたメッセージと転送または配信に失敗したメッセージが格納されます。
バインディングには、次の配信不能キュー プロパティがあります。
配信不能キューからのメッセージの読み取り
配信不能キューからメッセージを読み取るアプリケーションは、アプリケーション キューからメッセージを読み取る WCF サービスとよく似ていますが、次のようなわずかな違いがあります。
トランザクション システムの配信不能キューからメッセージを読み取るには、URI (Uniform Resource Identifier) を net.msmq://localhost/system$;DeadXact という形式にする必要があります。
非トランザクション システムの配信不能キューからメッセージを読み取る場合、URI は、net.msmq://localhost/system$;DeadLetter という形式にする必要があります。
カスタム配信不能キューからメッセージを読み取るには、URI を net.msmq://localhost/private/<custom-dlq-name> という形式にする必要があります。ここで、custom-dlq-name は、カスタム配信不能キューの名前です。
キューのアドレスを指定する方法詳細情報、「サービス エンドポイントとキューのアドレス指定」を参照してください。
受信側の WCF スタックは、サービスがリッスンしているアドレスとメッセージのアドレスを照合します。アドレスが一致する場合、メッセージはディスパッチされますが、一致しない場合はディスパッチされません。これにより、配信不能キューから読み取るときに問題が生じる可能性があります。一般に、配信不能キュー内のメッセージは該当サービスにアドレス指定され、配信不能キュー サービスにアドレス指定されないからです。そのため、配信不能キューから読み取るサービスは、ServiceBehavior アドレス フィルターをインストールし、受信者とは無関係にキュー内のすべてのメッセージを一致させるようスタックに指示する必要があります。具体的には、Any パラメーターを持つ ServiceBehavior を、配信不能キューからメッセージを読み取るサービスに追加する必要があります。
配信不能キューの有害メッセージの処理
配信不能キューでは、条件付きで有害メッセージを処理できます。システム キューからサブキューを作成できないため、システム配信不能キューから読み取るときは、ReceiveErrorHandling を Move に設定できません。カスタム配信不能キューから読み取る場合は、サブキューを使用できるので、Move が有害メッセージに対する有効な処置であることに注意してください。
ReceiveErrorHandling を Reject に設定しているときにカスタム配信不能キューから読み取った場合、有害メッセージはシステム配信不能キューに置かれます。システム配信不能キューから読み取ると、有害メッセージは破棄 (削除) されます。MSMQ のシステム配信不能キューから拒否すると、メッセージは破棄 (削除) されます。
例
次の例は、配信不能キューを作成する方法と、配信不能キューを使用して期限切れのメッセージを処理する方法を示しています。この例は、「方法 : WCF エンドポイントを使用してキューに置かれたメッセージを交換する」での例に基づいています。この例では、アプリケーションごとの配信不能キューを使用する発注書処理サービスにクライアント コードを書き込む方法を示します。また、配信不能キューのメッセージを処理する方法も示します。
以下は、アプリケーションごとの配信不能キューを指定するクライアントのコードです。
Imports System
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions
Namespace Microsoft.ServiceModel.Samples
'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
'Client implementation code.
Friend Class Client
Shared Sub Main()
' Get MSMQ queue name from appsettings in configuration.
Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")
' Create the transacted MSMQ queue for storing dead message if necessary.
If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
End If
Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
Try
' Create the purchase order.
Dim po As New PurchaseOrder()
po.CustomerId = "somecustomer.com"
po.PONumber = Guid.NewGuid().ToString()
Dim lineItem1 As New PurchaseOrderLineItem()
lineItem1.ProductId = "Blue Widget"
lineItem1.Quantity = 54
lineItem1.UnitCost = 29.99F
Dim lineItem2 As New PurchaseOrderLineItem()
lineItem2.ProductId = "Red Widget"
lineItem2.Quantity = 890
lineItem2.UnitCost = 45.89F
po.orderLineItems = New PurchaseOrderLineItem(1){}
po.orderLineItems(0) = lineItem1
po.orderLineItems(1) = lineItem2
'Create a transaction scope.
Using scope As New TransactionScope(TransactionScopeOption.Required)
' Make a queued call to submit the purchase order.
client.SubmitPurchaseOrder(po)
' Complete the transaction.
scope.Complete()
End Using
client.Close()
Catch timeout As TimeoutException
Console.WriteLine(timeout.Message)
client.Abort()
Catch conexcp As CommunicationException
Console.WriteLine(conexcp.Message)
client.Abort()
End Try
Console.WriteLine()
Console.WriteLine("Press <ENTER> to terminate client.")
Console.ReadLine()
End Sub
End Class
End Namespace
using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;
namespace Microsoft.ServiceModel.Samples
{
//The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.
//Client implementation code.
class Client
{
static void Main()
{
// Get MSMQ queue name from appsettings in configuration.
string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];
// Create the transacted MSMQ queue for storing dead message if necessary.
if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
System.Messaging.MessageQueue.Create(deadLetterQueueName, true);
OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
try
{
// Create the purchase order.
PurchaseOrder po = new PurchaseOrder();
po.CustomerId = "somecustomer.com";
po.PONumber = Guid.NewGuid().ToString();
PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
lineItem1.ProductId = "Blue Widget";
lineItem1.Quantity = 54;
lineItem1.UnitCost = 29.99F;
PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
lineItem2.ProductId = "Red Widget";
lineItem2.Quantity = 890;
lineItem2.UnitCost = 45.89F;
po.orderLineItems = new PurchaseOrderLineItem[2];
po.orderLineItems[0] = lineItem1;
po.orderLineItems[1] = lineItem2;
//Create a transaction scope.
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
{
// Make a queued call to submit the purchase order.
client.SubmitPurchaseOrder(po);
// Complete the transaction.
scope.Complete();
}
client.Close();
}
catch(TimeoutException timeout)
{
Console.WriteLine(timeout.Message);
client.Abort();
}
catch(CommunicationException conexcp)
{
Console.WriteLine(conexcp.Message);
client.Abort();
}
Console.WriteLine();
Console.WriteLine("Press <ENTER> to terminate client.");
Console.ReadLine();
}
}
}
以下は、クライアント構成ファイルのコードです。
<configuration>
<!-- Change "localhost" in the endpoint address to the machine name where the queue resides
Change "localhost" in the customDeadLetterQueue attribute in the binding element to the
machine name where the client application executes
-->
<appSettings>
<!-- use appSetting to configure MSMQ Dead Letter queue name -->
<add key="deadLetterQueueName" value=".\private$\ServiceModelSamplesOrdersAppDLQ"/>
</appSettings>
<system.serviceModel>
<client>
<!-- Define NetMsmqEndpoint -->
<endpoint name="OrderProcessorEndpoint"
address="net.msmq://localhost/private/ServiceModelSamplesDeadLetter"
binding="netMsmqBinding"
bindingConfiguration="PerAppDLQBinding"
contract="IOrderProcessor" />
</client>
<bindings>
<netMsmqBinding>
<binding name="PerAppDLQBinding"
deadLetterQueue="Custom"
customDeadLetterQueue="net.msmq://localhost/private/ServiceModelSamplesOrdersAppDLQ"
timeToLive="00:00:02"/>
</netMsmqBinding>
</bindings>
</system.serviceModel>
</configuration>
以下は、配信不能キューのメッセージを処理するサービスのコードです。
Imports System
Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions
Namespace Microsoft.ServiceModel.Samples
' Define a service contract.
<ServiceContract(Namespace := "http://Microsoft.ServiceModel.Samples")> _
Public Interface IOrderProcessor
<OperationContract(IsOneWay := True)> _
Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
End Interface
' Service class that implements the service contract.
' Added code to write output to the console window
<ServiceBehavior(InstanceContextMode := InstanceContextMode.Single, ConcurrencyMode := ConcurrencyMode.Single, AddressFilterMode := AddressFilterMode.Any)> _
Public Class PurchaseOrderDLQService
Implements IOrderProcessor
Private orderProcessorService As OrderProcessorClient
Public Sub New()
orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
End Sub
<OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := True)> _
Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
Console.WriteLine("Submitting purchase order did not succeed ", po)
Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
Console.WriteLine()
End Sub
<OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := True)> _
Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
Console.WriteLine("Submitting purchase order did not succeed ", po)
Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
Console.WriteLine()
' Resend the message if timed out.
If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
' Re-send.
Console.WriteLine("Purchase order Time To Live expired")
Console.WriteLine("Trying to resend the message")
' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
orderProcessorService.SubmitPurchaseOrder(po)
Console.WriteLine("Purchase order resent")
End If
End Sub
' Host the service within this EXE console application.
Public Shared Sub Main()
' Create a ServiceHost for the PurchaseOrderDLQService type.
Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
' Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open()
' The service can now be accessed.
Console.WriteLine("The dead letter service is ready.")
Console.WriteLine("Press <ENTER> to terminate service.")
Console.WriteLine()
Console.ReadLine()
' Close the ServiceHostBase to shutdown the service.
serviceHost.Close()
End Using
End Sub
End Class
End Namespace
using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;
namespace Microsoft.ServiceModel.Samples
{
// Define a service contract.
[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void SubmitPurchaseOrder(PurchaseOrder po);
}
// Service class that implements the service contract.
// Added code to write output to the console window
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
public class PurchaseOrderDLQService : IOrderProcessor
{
OrderProcessorClient orderProcessorService;
public PurchaseOrderDLQService()
{
orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
{
Console.WriteLine("Submitting purchase order did not succeed ", po);
MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
Console.WriteLine();
}
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
Console.WriteLine("Submitting purchase order did not succeed ", po);
MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;
Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
Console.WriteLine();
// Resend the message if timed out.
if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
{
// Re-send.
Console.WriteLine("Purchase order Time To Live expired");
Console.WriteLine("Trying to resend the message");
// Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
orderProcessorService.SubmitPurchaseOrder(po);
Console.WriteLine("Purchase order resent");
}
}
// Host the service within this EXE console application.
public static void Main()
{
// Create a ServiceHost for the PurchaseOrderDLQService type.
using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
{
// Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The dead letter service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
serviceHost.Close();
}
}
}
}
以下は、配信不能キュー サービス構成ファイルのコードです。
<!-- Change the endpoint address to reflect your machine name.
Place this code in the app.config for the Dead Letter Queue service -->
<configuration>
<system.serviceModel>
<services>
<service
name="Microsoft.ServiceModel.Samples.PurchaseOrderDLQService">
<!-- Define NetMsmqEndpoint in this case, DLQ end point to read messages-->
<endpoint address="net.msmq://localhost/private/ServiceModelSamplesOrdersAppDLQ"
binding="netMsmqBinding"
bindingConfiguration="DefaultBinding"
contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
</service>
</services>
<client>
<!-- Define NetMsmqEndpoint -->
<endpoint name="OrderProcessorEndpoint"
address="net.msmq://localhost/private/ServiceModelSamplesDeadLetter"
binding="netMsmqBinding"
bindingConfiguration="SystemDLQBinding"
contract="IOrderProcessor" />
</client>
<bindings>
<netMsmqBinding>
<binding name="DefaultBinding" />
<binding name="SystemDLQBinding"
deadLetterQueue="System"/>
</netMsmqBinding>
</bindings>
</system.serviceModel>
</configuration>
参照
処理手順
方法 : WCF エンドポイントを使用してキューに置かれたメッセージを交換する