다음을 통해 공유


ServiceBusReceiver 클래스

ServiceBusReceiver 클래스는 Azure Service Bus 큐 또는 토픽 구독에서 메시지를 수신하기 위한 상위 수준 인터페이스를 정의합니다.

메시지 수신을 위한 두 가지 기본 채널은 메시지에 대해 단일 요청을 수행하고 받는 사람의 메시지에 대해 수신 메시지를 지속적으로 수신하는 receive()입니다.

~azure.servicebus.ServiceBusClient의 메서드를 사용하여 get_<queue/subscription>_receiver ServiceBusReceiver instance 만듭니다.

상속
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

생성자

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

매개 변수

fully_qualified_namespace
str
필수

Service Bus 네임스페이스의 정규화된 호스트 이름입니다. 네임스페이스 형식은 .servicebus.windows.net.

credential
TokenCredential 또는 AzureSasCredential 또는 AzureNamedKeyCredential
필수

토큰을 가져오기 위한 특정 인터페이스를 구현하는 인증에 사용되는 자격 증명 개체입니다. azure-identity 라이브러리에서 생성된 자격 증명 개체와 *get_token(자체, 범위) 메서드를 구현하는 개체를 허용하거나 AzureSasCredential도 제공할 수 있습니다.

queue_name
str

클라이언트가 연결하는 특정 Service Bus 큐의 경로입니다.

topic_name
str

클라이언트가 연결하는 구독이 포함된 특정 Service Bus 토픽의 경로입니다.

subscription_name
str

클라이언트가 연결하는 지정된 항목 아래의 특정 Service Bus 구독 경로입니다.

max_wait_time
Optional[float]

수신자가 자동으로 수신을 중지하는 수신된 메시지 사이의 시간 제한(초)입니다. 기본값은 없음이며 시간 제한이 없음을 의미합니다.

receive_mode
Union[ServiceBusReceiveMode, str]

엔터티에서 메시지를 검색할 모드입니다. 두 가지 옵션은 PEEK_LOCK 및 RECEIVE_AND_DELETE. PEEK_LOCK 받은 메시지는 큐에서 제거되기 전에 지정된 잠금 기간 내에 해결되어야 합니다. RECEIVE_AND_DELETE 받은 메시지는 큐에서 즉시 제거되며, 클라이언트가 메시지를 처리하지 못하면 이후에 중단되거나 다시 수신될 수 없습니다. 기본 모드는 PEEK_LOCK.

logging_enable
bool

로거에 네트워크 추적 로그를 출력할지 여부입니다. 기본값은 False입니다.

transport_type
TransportType

Service Bus 서비스와 통신하는 데 사용할 전송 프로토콜의 유형입니다. 기본값은 TransportType.Amqp입니다.

http_proxy
Dict

HTTP 프록시 설정. 'proxy_hostname'(str value) 및 'proxy_port'(int value) 키가 있는 사전이어야 합니다. 또한 'username', 'password' 키도 있을 수 있습니다.

user_agent
str

지정된 경우 기본 제공 사용자 에이전트 문자열 앞에 추가됩니다.

auto_lock_renewer
Optional[AutoLockRenewer]

수신 시 메시지가 자동으로 등록되도록 ~azure.servicebus.AutoLockRenewer를 제공할 수 있습니다. 수신기가 세션 수신기인 경우 세션에 대신 적용됩니다.

prefetch_count
int

서비스에 대한 각 요청과 함께 캐시할 최대 메시지 수입니다. 이 설정은 고급 성능 튜닝에만 사용됩니다. 이 값을 늘리면 메시지 처리량 성능이 향상되지만 충분히 빠르게 처리되지 않은 경우 메시지가 캐시되는 동안 만료될 가능성이 높아집니다. 기본값은 0입니다. 즉, 메시지가 서비스에서 수신되고 한 번에 하나씩 처리됩니다. prefetch_count 0인 경우 ServiceBusReceiver.receive 는 서비스에 대한 요청 내에서 max_message_count 캐시하려고 시도합니다(제공된 경우).

client_identifier
str

클라이언트 instance 고유하게 식별하는 문자열 기반 식별자입니다. Service Bus는 오류의 상관 관계를 더 쉽게 위해 일부 오류 메시지와 연결합니다. 지정하지 않으면 고유 ID가 생성됩니다.

socket_timeout
float

연결의 기본 소켓이 시간이 초과되기 전에 데이터를 보내고 받을 때 대기해야 하는 시간(초)입니다. 기본값은 TransportType.Amqp의 경우 0.2이고 TransportType.AmqpOverWebsocket의 경우 1입니다. 쓰기 시간 초과로 인해 연결 오류가 발생하는 경우 기본값보다 큰 를 전달해야 할 수 있습니다.

변수

fully_qualified_namespace
str

Service Bus 네임스페이스의 정규화된 호스트 이름입니다. 네임스페이스 형식은 .servicebus.windows.net.

entity_path
str

클라이언트가 연결하는 엔터티의 경로입니다.

메서드

abandon_message

메시지를 중단합니다.

이 메시지는 큐로 반환되고 다시 받을 수 있습니다.

close
complete_message

메시지를 완료합니다.

그러면 큐에서 메시지가 제거됩니다.

dead_letter_message

메시지를 배달 못 한 편지 큐로 이동합니다.

배달 못한 편지 큐는 올바르게 처리하지 못했거나 추가 검사 또는 처리가 필요한 메시지를 저장하는 데 사용할 수 있는 하위 큐입니다. 만료된 메시지를 배달 못 한 편지 큐로 보내도록 큐를 구성할 수도 있습니다.

defer_message

메시지를 연기합니다.

이 메시지는 큐에 남아 있지만 수신하려면 시퀀스 번호로 특별히 요청해야 합니다.

next
peek_messages

큐에서 현재 보류 중인 메시지를 찾습니다.

피킹된 메시지는 큐에서 제거되지 않으며 잠겨 있지 않습니다. 그들은 완료, 지연 또는 배달 못한 편지 수 없습니다.

receive_deferred_messages

이전에 지연된 메시지를 받습니다.

분할된 엔터티에서 지연된 메시지를 수신하는 경우 제공된 모든 시퀀스 번호는 동일한 파티션의 메시지여야 합니다.

receive_messages

한 번에 메시지 일괄 처리를 수신합니다.

이 방법은 여러 메시지를 동시에 처리하거나 임시 수신을 단일 호출로 수행하려는 경우에 최적입니다.

단일 일괄 처리에서 검색된 메시지 수는 수신기에 대해 prefetch_count 설정되었는지 여부에 따라 달라집니다. 수신기에 대해 prefetch_count 설정되지 않은 경우 수신기는 서비스에 대한 요청 내에서 max_message_count(제공된 경우) 메시지를 캐시하려고 시도합니다.

이 호출은 지정된 일괄 처리 크기보다 빠르게 반환하는 우선 순위를 지정하므로 하나 이상의 메시지가 수신되고 지정된 일괄 처리 크기에 관계없이 들어오는 메시지의 간격이 있는 즉시 반환됩니다.

renew_message_lock

메시지 잠금을 갱신합니다.

이렇게 하면 메시지에 대한 잠금이 유지되어 다시 처리할 큐에 반환되지 않도록 합니다.

메시지를 완료(또는 그렇지 않으면 해결)하려면 잠금을 유지 관리해야 하며 이미 만료될 수 없습니다. 만료된 잠금을 갱신할 수 없습니다.

RECEIVE_AND_DELETE 모드를 통해 받은 메시지는 잠겨 있지 않으므로 갱신할 수 없습니다. 이 작업은 세션이 아닌 메시지에만 사용할 수 있습니다.

abandon_message

메시지를 중단합니다.

이 메시지는 큐로 반환되고 다시 받을 수 있습니다.

abandon_message(message: ServiceBusReceivedMessage) -> None

매개 변수

message
ServiceBusReceivedMessage
필수

중단될 받은 메시지입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

받은 메시지를 중단합니다.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

메시지를 완료합니다.

그러면 큐에서 메시지가 제거됩니다.

complete_message(message: ServiceBusReceivedMessage) -> None

매개 변수

message
ServiceBusReceivedMessage
필수

완료할 받은 메시지입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

받은 메시지를 완료합니다.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

메시지를 배달 못 한 편지 큐로 이동합니다.

배달 못한 편지 큐는 올바르게 처리하지 못했거나 추가 검사 또는 처리가 필요한 메시지를 저장하는 데 사용할 수 있는 하위 큐입니다. 만료된 메시지를 배달 못 한 편지 큐로 보내도록 큐를 구성할 수도 있습니다.

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

매개 변수

message
ServiceBusReceivedMessage
필수

배달 못한 편지로 받은 메시지입니다.

reason
Optional[str]
기본값: None

메시지를 배달 못한 편지로 보내는 이유입니다.

error_description
Optional[str]
기본값: None

메시지 배달 못 한 편지에 대한 자세한 오류 설명입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

배달 못한 편지 받은 메시지입니다.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

메시지를 연기합니다.

이 메시지는 큐에 남아 있지만 수신하려면 시퀀스 번호로 특별히 요청해야 합니다.

defer_message(message: ServiceBusReceivedMessage) -> None

매개 변수

message
ServiceBusReceivedMessage
필수

지연할 받은 메시지입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

받은 메시지를 연기합니다.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

큐에서 현재 보류 중인 메시지를 찾습니다.

피킹된 메시지는 큐에서 제거되지 않으며 잠겨 있지 않습니다. 그들은 완료, 지연 또는 배달 못한 편지 수 없습니다.

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

매개 변수

max_message_count
int
기본값: 1

시도하고 피킹할 최대 메시지 수입니다. 기본값은 1입니다.

sequence_number
int

메시지 검색을 시작할 메시지 시퀀스 번호입니다.

timeout
Optional[float]

모든 재시도를 포함한 총 작업 시간 제한(초)입니다. 지정된 경우 값이 0보다 커야 합니다. 기본값은 없음이며 시간 제한이 없음을 의미합니다.

반환

~azure.servicebus.ServiceBusReceivedMessage 목록입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

큐에서 보류 중인 메시지를 확인합니다.


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

이전에 지연된 메시지를 받습니다.

분할된 엔터티에서 지연된 메시지를 수신하는 경우 제공된 모든 시퀀스 번호는 동일한 파티션의 메시지여야 합니다.

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

매개 변수

sequence_numbers
Union[int,List[int]]
필수

지연된 메시지의 시퀀스 번호 목록입니다.

timeout
Optional[float]

모든 재시도를 포함한 총 작업 시간 제한(초)입니다. 지정된 경우 값이 0보다 커야 합니다. 기본값은 없음이며 시간 제한이 없음을 의미합니다.

반환

요청된 ~azure.servicebus.ServiceBusReceivedMessage 인스턴스의 목록입니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

ServiceBus에서 지연된 메시지를 받습니다.


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

한 번에 메시지 일괄 처리를 수신합니다.

이 방법은 여러 메시지를 동시에 처리하거나 임시 수신을 단일 호출로 수행하려는 경우에 최적입니다.

단일 일괄 처리에서 검색된 메시지 수는 수신기에 대해 prefetch_count 설정되었는지 여부에 따라 달라집니다. 수신기에 대해 prefetch_count 설정되지 않은 경우 수신기는 서비스에 대한 요청 내에서 max_message_count(제공된 경우) 메시지를 캐시하려고 시도합니다.

이 호출은 지정된 일괄 처리 크기보다 빠르게 반환하는 우선 순위를 지정하므로 하나 이상의 메시지가 수신되고 지정된 일괄 처리 크기에 관계없이 들어오는 메시지의 간격이 있는 즉시 반환됩니다.

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

매개 변수

max_message_count
Optional[int]
기본값: 1

일괄 처리의 최대 메시지 수입니다. 반환되는 실제 수는 prefetch_count 및 들어오는 스트림 속도에 따라 달라집니다. 없음으로 설정하면 프리페치 구성에 따라 완전히 달라집니다. 기본값은 1입니다.

max_wait_time
Optional[float]
기본값: None

첫 번째 메시지가 도착할 때까지 대기하는 최대 시간(초)입니다. 메시지가 도착하지 않고 시간 제한이 지정되지 않으면 연결이 닫을 때까지 이 호출이 반환되지 않습니다. 지정된 경우 시간 제한 기간 내에 메시지가 도착하지 않으면 빈 목록이 반환됩니다.

반환

받은 메시지 목록입니다. 사용할 수 있는 메시지가 없으면 빈 목록이 됩니다.

반환 형식

예외

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

예제

ServiceBus에서 메시지를 받습니다.


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

메시지 잠금을 갱신합니다.

이렇게 하면 메시지에 대한 잠금이 유지되어 다시 처리할 큐에 반환되지 않도록 합니다.

메시지를 완료(또는 그렇지 않으면 해결)하려면 잠금을 유지 관리해야 하며 이미 만료될 수 없습니다. 만료된 잠금을 갱신할 수 없습니다.

RECEIVE_AND_DELETE 모드를 통해 받은 메시지는 잠겨 있지 않으므로 갱신할 수 없습니다. 이 작업은 세션이 아닌 메시지에만 사용할 수 있습니다.

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

매개 변수

message
ServiceBusReceivedMessage
필수

잠금을 갱신할 메시지입니다.

timeout
Optional[float]

모든 재시도를 포함한 총 작업 시간 제한(초)입니다. 지정된 경우 값이 0보다 커야 합니다. 기본값은 없음입니다. 즉, 시간 제한이 없습니다.

반환

잠금이 만료되도록 설정된 utc datetime입니다.

반환 형식

예외

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

예제

받은 메시지에 대한 잠금을 갱신합니다.


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

특성

client_identifier

수신기 instance 연결된 ServiceBusReceiver client_identifier 가져옵니다.

반환 형식

str

session

수신기와 연결된 ServiceBusSession 개체를 가져옵니다. 세션은 세션 사용 엔터티에서만 사용할 수 있으며 세션이 아닌 수신기에서 호출된 경우 None을 반환합니다.

반환 형식

예제

수신기에서 세션 가져오기


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session