使用 Java 版本 8 客户端库的 Azure 队列存储代码示例
本文介绍使用适用于 Java 的 Azure 队列存储客户端库版本 8 的代码示例。
2023 年 3 月 31 日,我们停用了对不符合当前 Azure SDK 指南的 Azure SDK 库的支持。 新的 Azure SDK 库会定期更新,以推动一致的体验并增强安全态势。 建议转换到新的 Azure SDK 库,以利用新功能和关键安全更新。
尽管 2023 年 3 月 31 日之后仍然可以使用较旧的库,但它们将不再从 Microsoft 获得官方支持和更新。 有关详细信息,请参阅支持停用公告。
有关使用最新版本 12.x 客户端库版本的代码示例,请参阅快速入门:适用于 Java 的 Azure 队列存储客户端库。
创建队列
然后,添加以下 import
指令:
import com.microsoft.azure.storage.*;
import com.microsoft.azure.storage.queue.*;
利用 CloudQueueClient
对象,可以获取队列的引用对象。 以下代码可创建一个 CloudQueueClient
对象,该对象提供对要使用的队列的引用。 如果队列不存在,可以创建它。
注意
可通过其他方法来创建 CloudStorageAccount
对象。 有关详细信息,请参阅 Azure 存储客户端 SDK 引用中的 CloudStorageAccount
。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Create the queue if it doesn't already exist.
queue.createIfNotExists();
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
向队列添加消息
要将消息插入到现有队列中,请先创建新的 CloudQueueMessage
。 接下来,调用 addMessage
方法。 可以通过字符串(采用 UTF-8 格式)或字节数组创建 CloudQueueMessage
。 以下代码示例创建一个队列(如果该队列不存在)并插入消息“Hello, World
”。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Create the queue if it doesn't already exist.
queue.createIfNotExists();
// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage("Hello, World");
queue.addMessage(message);
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
扫视下一条消息
通过调用 peekMessage
,可以速览队列前面的消息,而不会从队列中删除它。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Peek at the next message.
CloudQueueMessage peekedMessage = queue.peekMessage();
// Output the message value.
if (peekedMessage != null)
{
System.out.println(peekedMessage.getMessageContentAsString());
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
更改已排队消息的内容
下面的代码示例将在消息队列中进行搜索,查找与 Hello, world
匹配的第一个消息内容,对消息内容进行修改,然后退出。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// The maximum number of messages that can be retrieved is 32.
final int MAX_NUMBER_OF_MESSAGES_TO_PEEK = 32;
// Loop through the messages in the queue.
for (CloudQueueMessage message : queue.retrieveMessages(MAX_NUMBER_OF_MESSAGES_TO_PEEK,1,null,null))
{
// Check for a specific string.
if (message.getMessageContentAsString().equals("Hello, World"))
{
// Modify the content of the first matching message.
message.setMessageContent("Updated contents.");
// Set it to be visible in 30 seconds.
EnumSet<MessageUpdateFields> updateFields =
EnumSet.of(MessageUpdateFields.CONTENT,
MessageUpdateFields.VISIBILITY);
// Update the message.
queue.updateMessage(message, 30, updateFields, null, null);
break;
}
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
以下代码示例只更新队列中的第一个可见消息。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Retrieve the first visible message in the queue.
CloudQueueMessage message = queue.retrieveMessage();
if (message != null)
{
// Modify the message content.
message.setMessageContent("Updated contents.");
// Set it to be visible in 60 seconds.
EnumSet<MessageUpdateFields> updateFields =
EnumSet.of(MessageUpdateFields.CONTENT,
MessageUpdateFields.VISIBILITY);
// Update the message.
queue.updateMessage(message, 60, updateFields, null, null);
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
获取队列长度
downloadAttributes
方法可检索多个值,包括队列中的当前消息数。 此计数仅为近似值,因为可能会在请求后添加或删除消息。 getApproximateMessageCount
方法可返回通过调用 downloadAttributes
检索到的最后一个值,而不会调用队列存储。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Download the approximate message count from the server.
queue.downloadAttributes();
// Retrieve the newly cached approximate message count.
long cachedMessageCount = queue.getApproximateMessageCount();
// Display the queue length.
System.out.println(String.format("Queue length: %d", cachedMessageCount));
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
取消下一条消息的排队
代码通过两个步骤来取消对队列中某条消息的排队。 调用 retrieveMessage
时,会获得队列中的下一条消息。 从 retrieveMessage
返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 deleteMessage
。 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。 代码在处理消息后会立即调用 deleteMessage
。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Retrieve the first visible message in the queue.
CloudQueueMessage retrievedMessage = queue.retrieveMessage();
if (retrievedMessage != null)
{
// Process the message in less than 30 seconds, and then delete the message.
queue.deleteMessage(retrievedMessage);
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
用于取消对消息进行排队的其他选项
以下代码示例使用 retrieveMessages
方法在一个调用中获取 20 条消息。 然后,使用 for
循环处理每条消息。 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。 超时同时针对所有消息启动。 自调用 retrieveMessages
起五分钟后,未删除的任何消息都会再次变得可见。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Retrieve 20 messages from the queue with a visibility timeout of 300 seconds.
for (CloudQueueMessage message : queue.retrieveMessages(20, 300, null, null)) {
// Do processing for all messages in less than 5 minutes,
// deleting each message after processing.
queue.deleteMessage(message);
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
列出队列
若要获取当前队列的列表,请调用 CloudQueueClient.listQueues()
方法,它会返回 CloudQueue
对象的集合。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient =
storageAccount.createCloudQueueClient();
// Loop through the collection of queues.
for (CloudQueue queue : queueClient.listQueues())
{
// Output each queue name.
System.out.println(queue.getName());
}
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}
删除队列
若要删除队列及其包含的所有消息,请对 CloudQueue
对象调用 deleteIfExists
方法。
try
{
// Retrieve storage account from connection-string.
CloudStorageAccount storageAccount =
CloudStorageAccount.parse(storageConnectionString);
// Create the queue client.
CloudQueueClient queueClient = storageAccount.createCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.getQueueReference("myqueue");
// Delete the queue if it exists.
queue.deleteIfExists();
}
catch (Exception e)
{
// Output the stack trace.
e.printStackTrace();
}