ServiceBusReceiver 類別
ServiceBusReceiver 類別會定義從Azure 服務匯流排佇列或主題訂用帳戶接收訊息的高階介面。
訊息回條的兩個主要通道是 receive () 來提出訊息的單一要求,以及 接收者中的訊息: 以持續的方式持續接收傳入訊息。
請使用 get_<queue/subscription>_receiver
~azure.servicebus.ServiceBusClient 的 方法來建立 ServiceBusReceiver 實例。
- 繼承
-
azure.servicebus._base_handler.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
建構函式
ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)
參數
- credential
- TokenCredential 或 AzureSasCredential 或 AzureNamedKeyCredential
用於驗證的認證物件,會實作用於取得權杖的特定介面。 它接受 azure-identity 程式庫所產生的認證物件,以及實作 *get_token (自我、 範圍) 方法的物件,或者也可以提供 AzureSasCredential。
- queue_name
- str
用戶端所連線的特定服務匯流排佇列路徑。
- topic_name
- str
包含用戶端所連線之訂用帳戶的特定服務匯流排主題路徑。
- subscription_name
- str
用戶端所連接之指定主題下的特定服務匯流排訂用帳戶路徑。
- receive_mode
- Union[ServiceBusReceiveMode, str]
從實體擷取訊息的模式。 這兩個選項PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的訊息必須在指定的鎖定期間內解決,才能從佇列中移除訊息。 使用 RECEIVE_AND_DELETE 接收的訊息將會立即從佇列中移除,如果用戶端無法處理訊息,則無法後續放棄或重新接收。 預設模式為 PEEK_LOCK。
- logging_enable
- bool
是否要將網路追蹤記錄輸出至記錄器。 預設值為 False。
- transport_type
- TransportType
將用於與服務匯流排服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp。
- http_proxy
- Dict
HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'。
- user_agent
- str
如果指定,這會新增到內建使用者代理程式字串前面。
- auto_lock_renewer
- Optional[AutoLockRenewer]
您可以提供 ~azure.servicebus.AutoLockRenewer,讓訊息在回條時自動註冊。 如果接收者是會話接收者,則會改為套用至會話。
- prefetch_count
- int
每個對服務的要求要快取的訊息數目上限。 此設定僅適用于進階效能微調。 增加此值可改善訊息輸送量效能,但如果訊息處理速度不夠快,則會增加訊息在快取時到期的機會。 預設值為 0,這表示訊息會從服務接收,並一次處理一個。 如果prefetch_count為 0,ServiceBusReceiver.receive 會嘗試快取max_message_count (,如果服務的要求內) 。
- client_identifier
- str
用來唯一識別用戶端實例的字串型識別碼。 服務匯流排會將它與某些錯誤訊息產生關聯,以便更輕鬆地相互關聯錯誤。 如果未指定,則會產生唯一識別碼。
- socket_timeout
- float
連接上基礎通訊端在傳送和接收資料之前應該等候的時間,以秒為單位。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果因為寫入逾時而發生連線錯誤,可能需要傳入大於預設值。
變數
- fully_qualified_namespace
- str
服務匯流排命名空間的完整主機名稱。 命名空間格式為: .servicebus.windows.net。
- entity_path
- str
用戶端所連線之實體的路徑。
方法
abandon_message |
放棄訊息。 此訊息會傳回至佇列,並可供再次接收。 |
close | |
complete_message |
完成訊息。 這會從佇列中移除訊息。 |
dead_letter_message |
將訊息移至寄不出的信件佇列。 寄不出的信件佇列是一個子佇列,可用來儲存無法正確處理的訊息,否則需要進一步檢查或處理。 佇列也可以設定為將過期的訊息傳送至寄不出的信件佇列。 |
defer_message |
延遲訊息。 此訊息會保留在佇列中,但必須透過其序號特別要求,才能接收。 |
next | |
peek_messages |
流覽佇列中目前擱置中的訊息。 查看的訊息不會從佇列中移除,也不會遭到鎖定。 無法完成、延遲或寄不出的信件。 |
receive_deferred_messages |
接收先前已延遲的訊息。 從分割實體接收延後訊息時,所有提供的序號都必須是來自相同資料分割的訊息。 |
receive_messages |
一次接收一批訊息。 如果您想要同時處理多個訊息,或以單一呼叫的形式執行臨機操作接收,此方法是最佳的方法。 請注意,在單一批次中擷取的訊息數目將取決於是否已為接收者設定 prefetch_count 。 如果未為接收者設定 prefetch_count ,接收者會嘗試快取max_message_count (,如果在要求內提供) 訊息給服務。 不論指定的批次大小為何,此呼叫都會優先傳回,因此在收到至少一則訊息時,就會立即傳回,而傳入訊息會有間距。不論指定的批次大小為何。 |
renew_message_lock |
更新訊息鎖定。 這會維護訊息的鎖定,以確保不會傳回要重新處理的佇列。 若要完成 (或解決訊息) ,必須維護鎖定,而且不能已經過期;無法更新過期的鎖定。 透過 RECEIVE_AND_DELETE 模式收到的訊息不會鎖定,因此無法更新。 這項作業也適用于非會話訊息。 |
abandon_message
放棄訊息。
此訊息會傳回至佇列,並可供再次接收。
abandon_message(message: ServiceBusReceivedMessage) -> None
參數
傳回類型
例外狀況
範例
放棄收到的訊息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.abandon_message(message)
close
close() -> None
例外狀況
complete_message
完成訊息。
這會從佇列中移除訊息。
complete_message(message: ServiceBusReceivedMessage) -> None
參數
傳回類型
例外狀況
範例
完成收到的訊息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.complete_message(message)
dead_letter_message
將訊息移至寄不出的信件佇列。
寄不出的信件佇列是一個子佇列,可用來儲存無法正確處理的訊息,否則需要進一步檢查或處理。 佇列也可以設定為將過期的訊息傳送至寄不出的信件佇列。
dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None
參數
傳回類型
例外狀況
範例
已接收的訊息寄不出的信件。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.dead_letter_message(message)
defer_message
延遲訊息。
此訊息會保留在佇列中,但必須透過其序號特別要求,才能接收。
defer_message(message: ServiceBusReceivedMessage) -> None
參數
傳回類型
例外狀況
範例
延遲收到的訊息。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.defer_message(message)
next
next()
例外狀況
peek_messages
流覽佇列中目前擱置中的訊息。
查看的訊息不會從佇列中移除,也不會遭到鎖定。 無法完成、延遲或寄不出的信件。
peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
參數
- sequence_number
- int
要從中開始流覽訊息的訊息序號。
傳回
~azure.servicebus.ServiceBusReceivedMessage 的清單。
傳回類型
例外狀況
範例
查看佇列中的擱置訊息。
with servicebus_receiver:
messages = servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
接收先前已延遲的訊息。
從分割實體接收延後訊息時,所有提供的序號都必須是來自相同資料分割的訊息。
receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]
參數
傳回
要求的 ~azure.servicebus.ServiceBusReceivedMessage 實例清單。
傳回類型
例外狀況
範例
從 ServiceBus 接收延遲的訊息。
with servicebus_receiver:
deferred_sequenced_numbers = []
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
servicebus_receiver.defer_message(message)
received_deferred_msg = servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for msg in received_deferred_msg:
servicebus_receiver.complete_message(msg)
receive_messages
一次接收一批訊息。
如果您想要同時處理多個訊息,或以單一呼叫的形式執行臨機操作接收,此方法是最佳的方法。
請注意,在單一批次中擷取的訊息數目將取決於是否已為接收者設定 prefetch_count 。 如果未為接收者設定 prefetch_count ,接收者會嘗試快取max_message_count (,如果在要求內提供) 訊息給服務。
不論指定的批次大小為何,此呼叫都會優先傳回,因此在收到至少一則訊息時,就會立即傳回,而傳入訊息會有間距。不論指定的批次大小為何。
receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]
參數
批次中的訊息數目上限。 傳回的實際數目取決於prefetch_count和傳入資料流程速率。 將 設定為 [無] 完全取決於預先擷取設定。預設值為 1。
等待第一則訊息到達的時間上限,以秒為單位。 如果未傳送任何訊息,而且未指定逾時,此呼叫將不會傳回,直到關閉連線為止。 如果指定,則不會在逾時期間內送達任何訊息,則會傳回空白清單。
傳回
已接收的訊息清單。 如果沒有可用的訊息,這會是空的清單。
傳回類型
例外狀況
範例
從 ServiceBus 接收訊息。
with servicebus_receiver:
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
servicebus_receiver.complete_message(message)
renew_message_lock
更新訊息鎖定。
這會維護訊息的鎖定,以確保不會傳回要重新處理的佇列。
若要完成 (或解決訊息) ,必須維護鎖定,而且不能已經過期;無法更新過期的鎖定。
透過 RECEIVE_AND_DELETE 模式收到的訊息不會鎖定,因此無法更新。 這項作業也適用于非會話訊息。
renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime
參數
傳回
鎖定設定為到期的 utc 日期時間。
傳回類型
例外狀況
範例
更新所接收訊息的鎖定。
messages = servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
servicebus_receiver.renew_message_lock(message)
屬性
client_identifier
session
取得與接收者連結的 ServiceBusSession 物件。 會話僅適用于啟用會話的實體,如果在非會話接收者上呼叫,則會傳回 None。
傳回類型
範例
從接收者取得會話
with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session