Partager via


Poison Message Handling in MSMQ 3.0

Cet exemple montre comment assurer la gestion des messages incohérents dans un service. Il est basé sur l'exemple Transacted MSMQ Binding. Il utilise netMsmqBinding. Le service est une application console auto-hébergée qui vous permet d'observer le service qui reçoit les messages mis en file d'attente.

Dans le cadre d'une communication en file d'attente, le client communique avec le service à l'aide d'une file d'attente. Plus précisément, le client envoie des messages à une file d'attente. Le service reçoit des messages de la file d'attente. Par conséquent, il n'est pas nécessaire que le service et le client s'exécutent simultanément pour communiquer à l'aide d'une file d'attente.

Un message incohérent est un message qui est lu à plusieurs reprises à partir d'une file d'attente lorsque le service qui le lit ne parvient pas à le traiter et met donc fin à la transaction sous laquelle le message est lu. Dans ce cas, le message est de nouveau réessayé. Cela peut théoriquement continuer indéfiniment en cas de problème avec le message. Notez que cela peut uniquement se produire lorsque vous utilisez des transactions pour lire à partir de la file d'attente et appeler l'opération de service.

Selon la version de MSMQ, NetMsmqBinding prend en charge la détection limitée ou complète des messages incohérents. Une fois que le message a été détecté comme étant incohérent, il est ensuite traité de plusieurs façons.

Cet exemple présente les fonctionnalités de détection limitée fournies sur les plateformes Windows Server 2003 et Windows XP, ainsi que celles de détection complète fournies sur Windows Vista. Dans les deux exemples, l'objectif est de déplacer le message incohérent de la file d'attente ver une autre qui pourra ensuite être prise en charge par un service de message incohérent.

MSMQ v3.0 (Windows Server 2003 and Windows XP) Poison Handling, exemple

MSMQ v3.0 ne prend pas en charge les sous-files d'attente de messages incohérents. Par conséquent, nous créons dans cet exemple une file d'attente permettant de contenir les messages incohérents. C'est la méthode de gestion des messages incohérents recommandée dans MSMQ v3.0 afin d'éviter toute perte de données.

La détection des messages incohérents dans Windows Server 2003 et Windows XP est restreinte à la configuration d'une propriété ReceiveRetryCount unique. ReceiveRetryCount correspond au nombre de fois que le message peut être lu à partir de la file d'attente. Windows Communication Foundation (WCF) gère ce nombre en mémoire. Une fois que ce nombre est atteint (c'est-à-dire, que le message est détecté comme étant incohérent), nous passons ensuite à la manière dont le message incohérent sera géré. La propriété d'énumération ReceiveErrorHandling sur la liaison fournit les valeurs possibles. Ces valeurs sont les suivantes :

  • Fault (valeur par défaut) : provoque l'échec de l'écouteur ainsi que de l'hôte de service.
  • Drop : permet de supprimer le message.
  • Move : permet de déplacer le message vers la sous-file d'attente de messages incohérents. Cette valeur est uniquement disponible sur Windows Vista.
  • Reject : permet de rejeter le message en le renvoyant dans la file d'attente de lettres mortes de l'expéditeur. Cette valeur est uniquement disponible sur Windows Vista.

Parmi ces valeurs, seules Fault et Drop sont valides, lors de l'utilisation de MSMQ v3.0. L'exemple indique comment gérer des messages incohérents à l'aide de MSMQ v3.0. Cet exemple peut également être exécuté dans MSMQ v4.0 et fonctionne exactement de la même façon que dans MSMQ v3.0.

Le contrat de service est IOrderProcessor, qui définit un service monodirectionnel pouvant être utilisé avec des files d'attente.

[ServiceContract(Namespace="http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true)]
    void SubmitPurchaseOrder(PurchaseOrder po);
}

L'opération de service affiche un message indiquant qu'elle traite la commande. Pour illustrer la fonctionnalité de message incohérent, l'opération de service SubmitPurchaseOrder lève une exception pour restaurer la transaction sur des appels aléatoires du service. Le message est alors remis dans la file d'attente. Le message est par la suite marqué comme étant incohérent. La configuration a la valeur par défaut sur les messages incohérents. Une fois que le message est incohérent, le canal mis en file d'attente lève une exception MsmqPoisonMessageException qui contient le MessageLookupId du message incohérent et provoque l'échec de ServiceHost. Un gestionnaire d'erreurs est joint afin de surveiller cette exception et prendre la mesure appropriée. Pour ajouter le gestionnaire d'erreurs, nous créons un PoisonErrorBehaviorAttribute et annotons OrderProcessorService avec ce comportement.

    // Service class that implements the service contract.
    // Added code to write output to the console window.
    [PoisonErrorBehavior]
    public class OrderProcessorService : IOrderProcessor
    {
        static Random r = new Random(137);
        public static string QueueName;
        public static string PoisonQueueName;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {

            int randomNumber = r.Next(10);

            if (randomNumber % 2 == 0)
            {
                Orders.Add(po);
                Console.WriteLine("Processing {0} ", po);
            }
            else
            {
                Console.WriteLine("Aborting transaction, cannot process purchase order: " + po.PONumber);
                Console.WriteLine();
                throw new Exception("Cannot process purchase order: " + po.PONumber);
            }
        }

        public static void StartThreadProc(object stateInfo)
        {
            StartService();
        }

        public static void StartService()
        {
            // Get MSMQ queue name from app settings in configuration
            QueueName = ConfigurationManager.AppSettings["queueName"];

            // Get MSMQ queue name for the final poison queue
            PoisonQueueName = ConfigurationManager.AppSettings["poisonQueueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!System.Messaging.MessageQueue.Exists(QueueName))
                System.Messaging.MessageQueue.Create(QueueName, true);

            // Create the transacted poison message MSMQ queue if necessary.
            if (!System.Messaging.MessageQueue.Exists(PoisonQueueName))
                System.Messaging.MessageQueue.Create(PoisonQueueName, true);


            // Get the base address that is used to listen for WS-MetaDataExchange requests
            // This is useful to generate a proxy for the client
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderProcessorService type.
            ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService), new Uri(baseAddress));

            // Hook on to the service host faulted events
            serviceHost.Faulted += new EventHandler(OnServiceFaulted);

            // Open the ServiceHostBase to create listeners and start listening for messages.
            serviceHost.Open();
            
        }


        public static void OnServiceFaulted(object sender, EventArgs e) 
        {
            Console.WriteLine("Service Faulted");
        }
        

        // Host the service within this EXE console application.
        public static void Main()
        {

            OrderProcessorService.StartService();

            // The service can now be accessed.
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press <ENTER> to stop the service");
            Console.ReadLine();
            Console.WriteLine("Exiting service");

        }
    }

PoisonErrorBehaviorAttribute installe PoisonErrorHandler. PoisonErrorHandler est une implémentation de IErrorHandler. IErrorHandler est appelé chaque fois qu'une exception est levée dans le système. Dans notre implémentation, nous recherchons MsmqPoisonMessageException qui indique qu'un message incohérent a été trouvé. Nous récupérons l'identificateur de recherche de message MSMQ de l'exception et utilisons l'API System.Messaging pour recevoir le message incohérent provenant de la file d'attente et le déplacer vers une file d'attente de messages incohérents spécifique pour traitement ultérieur. Le message pouvant encore être verrouillé sous la transaction ServiceModel, nous attendons de voir si la lecture a été infructueuse et réessayons de nouveau. Une fois que le message a été déplacé vers une autre file d'attente, le reste des messages de la file d'attente peut être consommé. Un nouveau ServiceHost est créé et ouvert pour traitement. L'exemple de code suivant illustre ce comportement :

    public class PoisonErrorHandler : IErrorHandler
    {
        static WaitCallback orderProcessingCallback = new WaitCallback(OrderProcessorService.StartThreadProc);

        public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
        {
            // no-op -we are not interested in this.
        }
        
        // Handle poison message exception by moving the offending message out of the way for regular processing to go on.
        public bool HandleError(Exception error)
        {
            MsmqPoisonMessageException poisonException = error as MsmqPoisonMessageException;
            if (null != poisonException)
            {
                long lookupId = poisonException.MessageLookupId;
                Console.WriteLine(" Poisoned message -message look up id = {0}", lookupId);

                // Get MSMQ queue name from app settings in configuration.

                System.Messaging.MessageQueue orderQueue = new System.Messaging.MessageQueue(OrderProcessorService.QueueName);
                System.Messaging.MessageQueue poisonMessageQueue = new System.Messaging.MessageQueue(OrderProcessorService.PoisonQueueName);
                System.Messaging.Message message = null;
                
                // Use a new transaction scope to remove the message from the main application queue and add it to the poison queue.
                // The poison message service processes messages from the poison queue.
                using(TransactionScope txScope= new TransactionScope(TransactionScopeOption.RequiresNew))
                {
                    int retryCount = 0;
                    while(retryCount < 3) 
                    {
                        retryCount++;
                    
                        try 
                        {
                            // Look up the poison message using the look up id.
                            message = orderQueue.ReceiveByLookupId(lookupId);
                            if(message != null) 
                            {
                                // Send the message to the poison message queue.
                                poisonMessageQueue.Send(message, System.Messaging.MessageQueueTransactionType.Automatic);
                                
                                // complete transaction scope
                                txScope.Complete();

                                Console.WriteLine("Moved poisoned message with look up id: {0} to poison queue: {1} ", lookupId, OrderProcessorService.PoisonQueueName);
                                break;
                            }
                        
                        }
                        catch(InvalidOperationException ) 
                        {
                            //Code for the case when the message may still not be available in the queue because of a race condition in transaction or 
                            //another node in the farm may actually have taken the message.
                            if (retryCount < 3) 
                            {
                                Console.WriteLine("Trying to move poison message but message is not available, sleeping for 10 seconds before retrying");
                                Thread.Sleep(TimeSpan.FromSeconds(10));
                            }
                            else 
                            {
                                Console.WriteLine("Giving up on trying to move the message");
                            }
                        }
                    
                    }
                }

                // Restart the service host.
                Console.WriteLine();
                Console.WriteLine("Restarting the service to process rest of the messages in the queue");
                Console.WriteLine("Press <ENTER> to stop the service");
                ThreadPool.QueueUserWorkItem(orderProcessingCallback);
                return true;
            }

            return false;
        }
    }

La configuration du service inclut les propriétés de message incohérent suivantes : receiveRetryCount et receiveErrorHandling, tel qu'indiqué dans le fichier de configuration suivant. Les propriétés maxRetryCycles et retryCycleDelay sont ignorées lors de l'exécution sur Windows Server 2003 et Windows XP.

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <!-- Use appSetting to configure the MSMQ queue name. -->
    <add key="queueName" value=".\private$\ServiceModelSamplesPoison" />
    <add key="poisonQueueName" value=".\private$\OrderProcessorPoisonQueue" />
    <add key="baseAddress" value="https://localhost:8000/orderProcessor/poisonSample"/>
  </appSettings>
  <system.serviceModel>
    <services>
      <service 
              name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison"
                  binding="netMsmqBinding"
                  bindingConfiguration="PoisonBinding" 
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
      </service>
    </services>

    <bindings>
      <netMsmqBinding>
        <binding name="PoisonBinding" 
                 receiveRetryCount="0"
                 maxRetryCycles="1"
                 retryCycleDelay="00:00:05"                      
                 receiveErrorHandling="Fault">
        </binding>
      </netMsmqBinding>
    </bindings>
  </system.serviceModel>
</configuration>

Traitement des messages de la file d'attente de messages incohérents

Le service de message incohérent lit les messages de la file d'attente finale de messages incohérents et les traite.

Les messages de la file d'attente de messages incohérents sont adressés au service de traitement, qui peut être différent du point de terminaison de service de message incohérent. Par conséquent, lorsque le service de message incohérent lit des messages de la file d'attente, la couche de canal WCF recherche l'incompatibilité dans les points de terminaison et ne distribue pas le message. Dans ce cas, le message est adressé au service de traitement des commandes mais est reçu par le service de message incohérent. Pour continuer à recevoir le message même s'il est adressé à un autre point de terminaison, nous devons ajouter un ServiceBehavior pour filtrer les adresses où le critère de correspondance doit correspondre aux points de terminaison de service auxquels le message est adressé. Cela est nécessaire pour pouvoir traiter les messages que vous lisez à partir de la file d'attente de messages incohérents.

L'implémentation du service de message incohérent elle-même est très similaire à l'implémentation du service. Elle implémente le contrat et traite les commandes. L'exemple de code se présente comme suit :

    // Service class that implements the service contract.
    // Added code to write output to the console window.
    [ServiceBehavior(AddressFilterMode=AddressFilterMode.Any)]
    public class OrderProcessorService : IOrderProcessor
    {
        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Orders.Add(po);
            Console.WriteLine("Processing {0} ", po);
        }

        public static void OnServiceFaulted(object sender, EventArgs e) 
        {
            Console.WriteLine("Service Faulted...exiting app");
            Environment.Exit(1);
        }


        // Host the service within this EXE console application.
        public static void Main()
        {
   
            // Create a ServiceHost for the OrderProcessorService type.
            ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService));

            // Hook on to the service host faulted events.
            serviceHost.Faulted += new EventHandler(OnServiceFaulted);
            
            serviceHost.Open();

            // The service can now be accessed.
            Console.WriteLine("The poison message service is ready.");
            Console.WriteLine("Press <ENTER> to terminate service.");
            Console.WriteLine();
            Console.ReadLine();

            // Close the ServiceHostBase to shutdown the service.
            if(serviceHost.State != CommunicationState.Faulted)
            {
                serviceHost.Close();
            }
        }

Contrairement au service de traitement des commandes qui lit les messages de la file d'attente de commande, le service de message incohérent lit les messages de la sous-file d'attente de messages incohérents. La file d'attente de messages incohérents est une sous-file d'attente de la file d'attente principale, elle est appelée « poison » et est générée automatiquement par MSMQ. Pour y accéder, indiquez le nom de la file d'attente principale suivi de « ; » et du nom de la sous-file d'attente (dans ce cas « poison »), tel qu'indiqué dans l'exemple de configuration suivant.

ms751472.note(fr-fr,VS.90).gifRemarque :
Dans l'exemple donné pour MSMQ v3.0, le nom de la file d'attente de messages incohérents n'est pas une sous-file d'attente, mais la file d'attente vers laquelle nous avons déplacé le message.

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <services>
      <service name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison;poison"
                  binding="netMsmqBinding"
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" >
        </endpoint>
      </service>
    </services>

  </system.serviceModel>
</configuration> 

Lorsque vous exécutez l'exemple, les activités du client, du service et du service de message incohérent s'affichent dans les fenêtres de console. Vous pouvez voir le service recevoir des messages du client. Appuyez sur ENTER dans chaque fenêtre de console pour arrêter les services.

Le service commence à s'exécuter, à traiter les commandes et à mettre fin au traitement de manière aléatoire. Si le message indique qu'il a traité la commande, vous pouvez réexécuter le client afin d'envoyer un autre message jusqu'à ce que vous constatiez que le service a réellement terminé un message. Selon les paramètres de gestion de message incohérent configurés, une tentative de traitement du message est effectuée avant qu'il ne soit déplacé vers la file d'attente finale de messages incohérents.

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 0f063b71-93e0-42a1-aa3b-bca6c7a89546
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Processing Purchase Order: 5ef9a4fa-5a30-4175-b455-2fb1396095fa
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Aborting transaction, cannot process purchase order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89

Démarrez le service de message incohérent afin de lire le message incohérent de la file d'attente de messages incohérents. Dans cet exemple, le service de message incohérent lit le message et le traite. Vous pouvez voir que la commande fournisseur qui a été terminée et marquée comme étant incohérente est lue par le service de message incohérent.

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Pour configurer, générer et exécuter l'exemple

  1. Assurez-vous d'avoir effectué la procédure indiquée dans la section Procédure d'installation unique pour les exemples Windows Communication Foundation.

  2. Pour générer l'édition C# ou Visual Basic .NET de la solution, suivez les instructions indiquées dans Génération des exemples Windows Communication Foundation.

  3. Pour exécuter l'exemple dans une configuration à un ou plusieurs ordinateurs, modifiez les noms de file d'attente afin qu'ils correspondent au nom d'hôte réel au lieu de localhost, puis suivez les instructions indiquées dans Exécution des exemples Windows Communication Foundation.

Avec le transport de liaison netMsmqBinding, la sécurité est activée par défaut. Les propriétés MsmqAuthenticationMode et MsmqProtectionLevel déterminent toutes deux le type de sécurité du transport. Par défaut, le mode d'authentification a la valeur Windows et le niveau de protection a la valeur Sign. Pour que MSMQ fournisse la fonctionnalité d'authentification et de signature, il doit faire partie d'un domaine. Si vous exécutez cet exemple sur un ordinateur ne faisant pas partie d'un domaine, vous recevez l'erreur suivante : "Le certificat Message Queuing interne pour l'utilisateur n'existe pas."

Pour exécuter l'exemple sur un ordinateur joint à un groupe de travail

  1. Si votre ordinateur ne fait pas partie d'un domaine, désactivez la sécurité du transport en affectant None au mode d'authentification et au niveau de protection, tel qu'indiqué dans l'exemple de configuration suivant :

    <bindings>
        <netMsmqBinding>
            <binding name="TransactedBinding">
                <security mode="None"/>
            </binding>
        </netMsmqBinding>
    </bindings>
    

    Assurez-vous que le point de terminaison est associé à la liaison en définissant l'attribut bindingConfiguration du point de terminaison.

  2. Assurez-vous de modifier la configuration sur PoisonMessageServer, le serveur et le client avant d'exécuter l'exemple.

    ms751472.note(fr-fr,VS.90).gifRemarque :
    L'affectation de None à security mode revient à affecter None à la sécurité MsmqAuthenticationMode, MsmqProtectionLevel et Message.

Send comments about this topic to Microsoft.
© 2007 Microsoft Corporation. All rights reserved.