Dela via


C-C++ Code Example: Checking Transaction Boundaries

 

Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista

The following example reads all the messages in the queue to determine which message was the first message sent within a specific transaction, what subsequent message were sent as part of the transaction, and which message was the last message sent in the transaction.

The following three message properties are retrieved for each message.

To check transaction boundaries

  1. Define the structures needed to retrieve messages.

  2. Specify the properties of the message to be retrieved. (This example retrieves the message body and the three transaction boundary properties.)

  3. Initialize the MQMSGPROPS structure.

  4. Call MQOpenQueue to open the transactional queue with receive access. This example uses the format name supplied by the caller to open the queue.

  5. Call MQReceiveMessage to retrieve the messages from the queue. For each message retrieved, its transaction boundary properties are checked to see if the boundaries of the transactions are not valid.

  6. Call MQCloseQueue to close the destination queue.

Code Example

This example requires MSMQ 2.0 or later.

int XactVerify(  
               LPCWSTR wszQueueFormatName  
               )  
{  
  
  // Validate the input string.  
  if (wszQueueFormatName == NULL)  
  {  
    return MQ_ERROR_INVALID_PARAMETER;  
  }  
  
  // Define the maximum number of properties and a property counter.  
  const int NUMBER_OF_PROPERTIES = 5 ;       // Number of message properties  
  DWORD cPropId = 0;                         // Property counter  
  
  //Define an MQMSGPROPS structure.  
  MQMSGPROPS msgprops;  
  MSGPROPID aMsgPropId[NUMBER_OF_PROPERTIES];  
  PROPVARIANT aMsgPropVar[NUMBER_OF_PROPERTIES];  
  HRESULT aMsgStatus[NUMBER_OF_PROPERTIES];  
  
  HANDLE hQueue = NULL;                      // Queue handle  
  HRESULT hr = MQ_OK;                        // Return code  
  DWORD dwAccess;                            // Access mode of the queue  
  DWORD dwShareMode;                         // Share mode of the queue  
  DWORD dwRecAction;                         // Receive action  
  
  // Create a message body buffer.  
  ULONG ulBodyBufferSize = 1024;  
  UCHAR * pucBodyBuffer = NULL;  
  *pucBodyBuffer = (UCHAR *)malloc(ulBodyBufferSize);  
  if (pucBodyBuffer == NULL)  
  {  
    return MQ_ERROR_INSUFFICIENT_RESOURCES;  
  }  
  memset(pucBodyBuffer, 0, ulBodyBufferSize);  
  
  OBJECTID xid;                              // Transaction ID buffer  
  
  // Specify message properties.  
  aMsgPropId[cPropId] = PROPID_M_BODY_SIZE;       // 0: Body size  
  aMsgPropVar[cPropId].vt = VT_UI4;                 
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_BODY;            // 1: Body  
  aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1;  
  aMsgPropVar[cPropId].caub.pElems = (UCHAR*)pucBodyBuffer;  
  aMsgPropVar[cPropId].caub.cElems = ulBodyBufferSize;  
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_FIRST_IN_XACT;   // 2: First  
  aMsgPropVar[cPropId].vt = VT_UI1;               
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_LAST_IN_XACT;    // 3: Last   
  aMsgPropVar[cPropId].vt = VT_UI1;                 
  cPropId++;  
  
  aMsgPropId[cPropId] = PROPID_M_XACTID;          // 4: Xact index  
  aMsgPropVar[cPropId].vt = VT_UI1 | VT_VECTOR;               
  aMsgPropVar[cPropId].caub.pElems = (PUCHAR)&xid;  
  aMsgPropVar[cPropId].caub.cElems = sizeof(OBJECTID);  
  cPropId++;  
  
  // Initialize the MQMSGPROPS structure.  
  msgprops.cProp = cPropId;          // Number of message properties  
  msgprops.aPropID = aMsgPropId;     // IDs of the message properties  
  msgprops.aPropVar = aMsgPropVar;   // Values of the message properties  
  msgprops.aStatus  = aMsgStatus;    // Error reports  
  
  // Open the destination queue to read the message.  
  dwAccess = MQ_RECEIVE_ACCESS;             // Access mode of the queue  
  dwShareMode = MQ_DENY_RECEIVE_SHARE;      // Share mode of the queue  
  hr = MQOpenQueue(wszQueueFormatName,      // Format name of the queue  
                   dwAccess,                // Access mode  
                   dwShareMode,             // Share mode  
                   &hQueue                  // OUT: Handle to queue  
                   );  
  if (FAILED(hr))  
  {  
    free (pucBodyBuffer);  
    return hr;  
  }  
  
  // Declare variables for the processing messages.  
  OBJECTID oidCurrentXID;  
  ZeroMemory(&oidCurrentXID, sizeof(OBJECTID));  
  BOOL  fInsideXact = FALSE;  
  
  // Test the transaction boundaries and process the messages if no  
  // messages are lost.  
  for ( ; ; )  
  {  
    // Retrieve a message.  
    dwRecAction = MQ_ACTION_RECEIVE;        // Receive action  
    hr = MQReceiveMessage(  
                          hQueue,           // Queue handle  
                          0,                // Maximum time (msec)  
                          dwRecAction,      // Receive action  
                          &msgprops,        // Message property structure  
                          NULL,             // No OVERLAPPED structure  
                          NULL,             // No callback function  
                          NULL,             // No cursor  
                          NULL              // Not in a transaction  
                          );  
  
    if (FAILED(hr))  
    {  
      if ( hr != MQ_ERROR_IO_TIMEOUT)  
      {  
        fprintf(stderr, "Retrieving the message failed. Error: 0x%x\n",hr);  
      }  
      if (fInsideXact)  
      {  
        fprintf(stderr, "The  last message(s) in transaction %x is missing.\n", oidCurrentXID.Uniquifier);  
      }  
      break;  
    }  
  
    // Test for the case of the first message in a transaction.  
    if (aMsgPropVar[2].bVal == MQMSG_FIRST_IN_XACT)  
    {  
      if (!fInsideXact)  
      {  
        // OK - starting a new transaction.  
        fInsideXact = TRUE;  
        oidCurrentXID = xid;  
        fprintf(stderr, "First message from transaction %x\n", xid.Uniquifier);  
      }  
      else   
      {  
        // Error condition: Starting a new transaction before   
        // the previous transaction has been closed.  
        fprintf(stderr, "***The last message(s) of transaction %x is missing.\n", oidCurrentXID.Uniquifier);  
     }  
    }  
  
    // Test for the case of the last message in a transaction.  
    if (aMsgPropVar[3].bVal == MQMSG_LAST_IN_XACT)  
    {  
      if (fInsideXact && xid.Uniquifier==oidCurrentXID.Uniquifier)  
      {  
        // OK - we are within the same transaction.  
        fInsideXact = FALSE;  
        fprintf(stderr, "Last message from transaction %x\n", xid.Uniquifier);  
  
        // At this point we have full data on one   
        // transaction. Process this information as  
        // needed by your application.  
      }  
      else  
      {  
        // Error condition: We have the last message of transaction X   
        // while within transaction Y.   
        if (xid.Uniquifier != oidCurrentXID.Uniquifier && oidCurrentXID.Uniquifier!=0)  
        {  
          fprintf(stderr, "***The last message(s) of transaction %x is missing\n", oidCurrentXID.Uniquifier);  
        }  
        if (!fInsideXact)  
        {  
          fprintf(stderr, "***The first message(s) of transaction %x is missing.\n", xid.Uniquifier);  
        }  
      }  
    }  
  
    // Test whether the message is an inner message of a current transaction.  
    if (aMsgPropVar[2].bVal == MQMSG_NOT_FIRST_IN_XACT &&  
        aMsgPropVar[3].bVal == MQMSG_NOT_LAST_IN_XACT)  
    {  
      if (xid.Uniquifier==oidCurrentXID.Uniquifier)  
      {  
        // OK - just inner message  
        fprintf(stderr, "Message from transaction %x\n", xid.Uniquifier);  
      }  
      else  
      {  
        // Error condition: We have an inner message of   
        // transaction X while within transaction Y.  
        if (oidCurrentXID.Uniquifier!=0)  
        {  
          fprintf(stderr, "***The last message(s) of transaction %x is missing.\n", oidCurrentXID.Uniquifier);  
        }  
        if (!fInsideXact)  
        {  
          fprintf(stderr, "***The first message(s) of transaction %x is missing.\n", xid.Uniquifier);  
        }  
        oidCurrentXID = xid;  
        fInsideXact = TRUE;  
      }  
    }  
  }  
  
  // Close the queue.  
  hr = MQCloseQueue(hQueue);  
  
  free (pucBodyBuffer);  
  return MQ_OK;  
}