Partager via


Grouping Queued Messages in a Session

Windows Communication Foundation (WCF) provides a session that allows you to group a set of related messages together for processing by a single receiving application. Messages that are part of a session must be part of the same transaction. Because all messages are part of the same transaction, if one message fails to be processed the entire session is rolled back. Sessions have similar behaviors with regard to dead-letter queues and poison queues. The Time to Live (TTL) property set on a queued binding configured for sessions is applied to the session as a whole. If only some of the messages in the session are sent before the TTL expires, the entire session is placed in the dead-letter queue. Similarly, when messages in a session fail to be sent to an application from the application queue, the entire session is placed in the poison queue (if available).

Message Grouping Example

One example where grouping messages is helpful is when implementing an order-processing application as a WCF service. For instance, a client submits an order to this application that contains a number of items. For each item, the client makes a call to the service, which results in a separate message being sent. It is possible for serve A to receive the first item, and server B to receive the second item. Each time an item is added, the server processing that item has to find the appropriate order and add the item to it, which is highly inefficient. You still run into such inefficiencies with only a single server handling all requests, because the server must keep track of all orders currently being processed and determine which one the new item belongs to. Grouping all requests for a single order greatly simplifies implementation of such an application. The client application sends all items for a single order in a session, so when the service processes the order, it processes the entire session at once. \

Procedures

To set up a service contract to use sessions

  1. Define a service contract that requires a session. Do this with the OperationContractAttribute attribute and by specifying:

    SessionMode=SessionMode.Required
    
  2. Mark the operations in the contract as one-way, because these methods do not return anything. This is done with the OperationContractAttribute attribute and by specifying:

    [OperationContract(IsOneWay = true)]
    
  3. Implement the service contract and specify an InstanceContextMode of PerSession. This instantiates the service only once for each session.

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    
  4. Each service operation requires a transaction. Specify this with the OperationBehaviorAttribute attribute. The operation that completes the transaction should also set TransactionAutoComplete to true.

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] 
    
  5. Configure an endpoint that uses the system-provided NetProfileMsmqBinding binding.

  6. Create a transactional queue using System.Messaging. You can also create the queue by using Message Queuing (MSMQ) or MMC. If you do, create a transactional queue.

  7. Create a service host for the service by using ServiceHost.

  8. Open the service host to make the service available.

  9. Close the service host.

To set up a client

  1. Create a transaction scope to write to the transactional queue.

  2. Create the WCF client using the ServiceModel Metadata Utility Tool (Svcutil.exe) tool.

  3. Place the order.

  4. Close the WCF client.

Example

Description

The following example provides the code for the IProcessOrder service and for a client that uses this service. It shows how WCF uses queued sessions to provide the grouping behavior.

Code for the Service

// Service Code:

using System;

using System.ServiceModel.Channels;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
using System.Text;
using System.Collections.Generic;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract. 
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", SessionMode=SessionMode.Required)]
    public interface IOrderTaker
    {
        [OperationContract(IsOneWay = true)]
        void OpenPurchaseOrder(string customerId);

        [OperationContract(IsOneWay = true)]
        void AddProductLineItem(string productId, int quantity);

        [OperationContract(IsOneWay = true)]
        void EndPurchaseOrder();
    }

    // Define the Purchase Order Line Item
    public class PurchaseOrderLineItem
    {
        static Random r = new Random(137);

        string ProductId;
        float UnitCost;
        int Quantity;

        public PurchaseOrderLineItem(string productId, int quantity)
        {
            this.ProductId = productId;
            this.Quantity = quantity;
            this.UnitCost = r.Next(10000);
        }

        public override string ToString()
        {
            String displayString = "Order LineItem: " + Quantity + " of " + ProductId + " @unit price: $" + UnitCost + "\n";
            return displayString;
        }

        public float TotalCost
        {
            get { return UnitCost * Quantity; }
        }
    }

    // Define Purchase Order
    public class PurchaseOrder
    {
        string PONumber;
        string CustomerId;
        LinkedList<PurchaseOrderLineItem> orderLineItems = new LinkedList<PurchaseOrderLineItem>();

        public PurchaseOrder(string customerId)
        {
            this.CustomerId = customerId;
            this.PONumber = Guid.NewGuid().ToString();
        }

        public void AddProductLineItem(string productId, int quantity)
        {
            orderLineItems.AddLast(new PurchaseOrderLineItem(productId, quantity));
        }

        public float TotalCost
        {
            get
            {
                float totalCost = 0;
                foreach (PurchaseOrderLineItem lineItem in orderLineItems)
                    totalCost += lineItem.TotalCost;
                return totalCost;
            }
        }

        public string Status
        {
            get
            {
                return "Pending";
            }
        }

        public override string ToString()
        {
            StringBuilder strbuf = new StringBuilder("Purchase Order: " + PONumber + "\n");
            strbuf.Append("\tCustomer: " + CustomerId + "\n");
            strbuf.Append("\tOrderDetails\n");

            foreach (PurchaseOrderLineItem lineItem in orderLineItems)
            {
                strbuf.Append("\t\t" + lineItem.ToString());
            }

            strbuf.Append("\tTotal cost of this order: $" + TotalCost + "\n");
            strbuf.Append("\tOrder status: " + Status + "\n");
            return strbuf.ToString();
        }
    }


    // Service class which implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class OrderTakerService : IOrderTaker
    {
        PurchaseOrder po;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void OpenPurchaseOrder(string customerId)
        {
            Console.WriteLine("Creating purchase order");
            po = new PurchaseOrder(customerId);
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void AddProductLineItem(string productId, int quantity)
        {
            po.AddProductLineItem(productId, quantity);
            Console.WriteLine("Product " + productId + " quantity " + quantity + " added to purchase order");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void EndPurchaseOrder()
        {
            Console.WriteLine("Purchase Order Completed");
            Console.WriteLine();
            Console.WriteLine(po.ToString());
        }


        // Host the service within this EXE console application.
        public static void Main()
        {
            // Get MSMQ queue name from app settings in configuration
            string queueName = ConfigurationManager.AppSettings["queueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);


            // Get the base address that is used to listen for WS-MetaDataExchange requests
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderTakerService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(OrderTakerService), new Uri(baseAddress)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close(); 
            }
        }
    }
}
<!-- Service Config File: -->
<appSettings>
  <!-- use appSetting to configure MSMQ queue name -->
  <add key="queueName" value=".\private$\ServiceModelSamplesSession" />
  <add key="baseAddress" value="https://localhost:8000/orderTaker/sessionSample"/>
</appSettings>

<system.serviceModel>
  <services>
    <service name="Microsoft.ServiceModel.Samples.OrderTakerService"
             behaviorConfiguration="MyServiceTypeBehaviors" >

      <!-- Define NetMsmqEndpoint -->
      <endpoint address="net.msmq://localhost/private/ServiceModelSamplesSession"
                binding="netMsmqBinding"
                contract="Microsoft.ServiceModel.Samples.IOrderTaker" />
      <endpoint contract="IMetadataExchange" binding="mexHttpBinding" address="mex" />

    </service>
  </services>

  <behaviors>
    <serviceBehaviors>
      <behavior name="MyServiceTypeBehaviors" >
        <serviceMetadata httpGetEnabled="true" />
      </behavior>
    </serviceBehaviors>
  </behaviors>

</system.serviceModel>

Code for the Client

using System;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            //Create a transaction scope.
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                // Create a proxy with given client endpoint configuration
                OrderTakerClient client = new OrderTakerClient("OrderTakerEndpoint");
                try
                {
                    // Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com");
                    Console.WriteLine("Purchase Order created");

                    // Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget");
                    client.AddProductLineItem("Blue Widget", 10);

                    Console.WriteLine("Adding 23 quantities of red widget");
                    client.AddProductLineItem("Red Widget", 23);

                    // Close the purchase order
                    Console.WriteLine("Closing the purchase order");
                    client.EndPurchaseOrder();
                    client.Close();
                }
                catch (CommunicationException ex)
                {
                    client.Abort();
                }
                // Complete the transaction.
                scope.Complete();
            }
            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
<system.serviceModel>

  <client>
    <!-- Define NetMsmqEndpoint -->
    <endpoint name="OrderTakerEndpoint"
              address="net.msmq://localhost/private/ServiceModelSamplesSession" 
              binding="netMsmqBinding" 
              contract="IOrderTaker" />
  </client>

</system.serviceModel>

See Also

Concepts

Queues Overview

Other Resources

Sessions and Queues Sample