C-C++ Code Example: Correlation Identifier Filters
Applies To: Windows 10, Windows 7, Windows 8, Windows 8.1, Windows Server 2008, Windows Server 2008 R2, Windows Server 2012, Windows Server 2012 R2, Windows Server Technical Preview, Windows Vista
This example provides an application-defined function that peeks at the PROPID_M_CORRELATIONID property of each message in the queue and removes all the messages that have a specific correlation identifier that is provided by the caller.
Note
In this example, the cursor used to navigate the queue behaves differently if a message is, or is not, removed from the queue. If a message is removed from the queue, Message Queuing moves the cursor to the next message. If the message is not removed from the queue, the application must move the cursor.
To filter messages by correlation identifier
Define the maximum number of queue properties to be specified and the queue property counter.
Define the MQMSGPROPS structure.
Specify PROPID_M_CORRELATIONID. The correlation ID is stored in an array of 20 unsigned characters, whose size can be defined by the constant PROPID_M_CORRELATIONID_SIZE.
Initialize the MQMSGPROPS structure.
Call MQOpenQueue to open the queue with receive access. Receive access allows the application to peek at or remove the messages in the queue.
Call MQCreateCursor to create the cursor for navigating the queue. The returned cursor handle is used in the calls to read messages.
Call MQReceiveMessage with peek access to peek at the first message in the queue. This call also initializes the cursor so that the cursor points at the first message in the queue.
In a loop structure, continue to call MQReceiveMessage to peek at and remove the messages in the queue. Note that the receive action of each call varies depending on if the previous message was or was not removed from the queue.
If the current message has the correct correlation identifier, set dwRecAction to MQ_ACTION_RECEIVE, call MQReceiveMessage to remove the message, then reset dwRecAction to MQ_ACTION_PEEK_CURRENT to process the next message. This will remove the current message and Message Queuing moves the cursor to the next message.
If the message does not have the correct correlation identifier, set dwRecAction to MQ_ACTION_PEEK_NEXT to move the cursor and process the next message. In this case, the application must move the cursor to the next message.
Call MQCloseQueue and MQCloseCursor to free the resources.
The following code example can be run on all versions of Message Queuing.
HRESULT FilterCorrelationId(
LPCWSTR wszQueueFormatName,
CAUB * pCorrelationId
)
{
// Validate the input parameters.
if ((wszQueueFormatName == NULL) || (pCorrelationId == NULL))
{
return MQ_ERROR_INVALID_PARAMETER;
}
// Define the maximum number of properties and a property counter.
const int NUMBEROFPROPERTIES = 5; // Number of properties
DWORD cPropId = 0; // Property counter
// Define the MQMSGPROPS structure.
MQMSGPROPS msgprops;
MSGPROPID aMsgPropId[NUMBEROFPROPERTIES];
PROPVARIANT aMsgPropVar[NUMBEROFPROPERTIES];
HRESULT aMsgStatus[NUMBEROFPROPERTIES];
HANDLE hQueue = NULL; // Queue handle
HANDLE hCursor = NULL; // Cursor handle
HRESULT hr = MQ_OK; // Return code
DWORD dwRecAction; // Receive action mode
// Specify the message properties to be retrieved.
UCHAR aCorrelationId[PROPID_M_CORRELATIONID_SIZE];
aMsgPropId[cPropId] = PROPID_M_CORRELATIONID; // Property ID
aMsgPropVar[cPropId].vt = VT_VECTOR | VT_UI1; // Type indicator
aMsgPropVar[cPropId].caub.cElems = PROPID_M_CORRELATIONID_SIZE;
aMsgPropVar[cPropId].caub.pElems = aCorrelationId;
cPropId++;
// Initialize the MQMSGPROPS structure.
msgprops.cProp = cPropId; // Number of properties
msgprops.aPropID = aMsgPropId; // IDs of properties
msgprops.aPropVar = aMsgPropVar; // Values of properties
msgprops.aStatus = aMsgStatus; // Error reports
// Open the queue to filter messages.
hr = MQOpenQueue(
wszQueueFormatName, // Format name of queue
MQ_RECEIVE_ACCESS, // Access mode
MQ_DENY_RECEIVE_SHARE, // Share mode
&hQueue // OUT: Handle of queue
);
if (FAILED(hr))
{
return hr;
}
// Create the cursor used to navigate through the queue.
hr = MQCreateCursor(
hQueue, // Queue handle
&hCursor // OUT: Handle to cursor
);
if (FAILED(hr))
{
MQCloseQueue(hQueue);
return hr;
}
// Peek at first message in the queue.
hr = MQReceiveMessage(
hQueue, // Queue handle
0, // Max time (msec)
MQ_ACTION_PEEK_CURRENT, // Receive action
&msgprops, // Message property structure
NULL, // No OVERLAPPED structure
NULL, // No callback function
hCursor, // Cursor handle
MQ_NO_TRANSACTION // Not in a transaction
);
if (FAILED(hr))
{
MQCloseCursor(hCursor);
MQCloseQueue(hQueue);
return hr;
}
// Filter messages from queue.
do
{
if ((msgprops.aPropVar[0].caub.cElems == pCorrelationId->cElems) &&
(!memcmp(msgprops.aPropVar[0].caub.pElems, pCorrelationId->pElems, pCorrelationId->cElems) ))
{
dwRecAction = MQ_ACTION_RECEIVE; // Receive action
hr = MQReceiveMessage(
hQueue, // Queue handle
0, // Max time (msec)
dwRecAction, // Receive action
&msgprops, // Message property structure
NULL, // No OVERLAPPED structure
NULL, // No callback function
hCursor, // Cursor handle
MQ_NO_TRANSACTION // Not in a transaction
);
if (FAILED(hr))
{
break;
}
//
// Process the message.
//
dwRecAction = MQ_ACTION_PEEK_CURRENT; // Receive action
}
else
{
dwRecAction = MQ_ACTION_PEEK_NEXT; // Receive action
}
hr = MQReceiveMessage(
hQueue, // Queue handle
0, // Max time (msec)
dwRecAction, // Receive action
&msgprops, // Message property structure
NULL, // No OVERLAPPED structure
NULL, // No callback function
hCursor, // Cursor handle
MQ_NO_TRANSACTION // Not in a transaction
);
if (FAILED(hr))
{
break;
}
} while (SUCCEEDED(hr));
// Close cursor and queue.
hr = MQCloseCursor(hCursor);
if (FAILED(hr))
{
MQCloseQueue(hQueue);
return hr;
}
hr = MQCloseQueue(hQueue);
return hr;
}