다음을 통해 공유


Azure Service Bus 큐에서 메시지 보내기 및 받기(Go)

이 자습서에서는 Java 프로그래밍 언어를 사용하여 Azure Service Bus 큐에 메시지를 보내고 받는 방법에 대해 알아봅니다.

Azure Service Bus는 메시지 큐와 게시/구독 기능이 있는 완전 관리형 엔터프라이즈 메시지 브로커입니다. Service Bus 애플리케이션과 서비스를 서로 분리하여 분산되고 안정적이며 고성능 메시지 전송을 제공하는 데 사용됩니다.

Go용 Azure SDK azservicebus 패키지를 사용하면 Go 프로그래밍 언어를 사용하여 Azure Service Bus 메시지를 보내고 받을 수 있습니다.

이 자습서를 마치면 단일 메시지 또는 일괄 처리 메시지를 큐에 보내고, 메시지를 받고, 처리되지 않은 배달 못한 메시지를 받을 수 있습니다.

필수 구성 요소

샘플 앱 만들기

시작하려면 새 Go 모듈을 만듭니다.

  1. service-bus-go-how-to-use-queues라는 모듈에 대한 새 디렉터리를 만듭니다.

  2. azservicebus 디렉터리에서 모듈을 초기화하고 필요한 패키지를 설치합니다.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. 이름이 main.go인 새 파일을 만듭니다.

클라이언트 인증 및 만들기

main.go 파일에서 GetClient라는 새 함수를 만들고 다음 코드를 추가합니다.

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

GetClient 함수는 Azure Service Bus 네임스페이스 및 자격 증명을 사용하여 만든 새 azservicebus.Client 개체를 반환합니다. 네임스페이스는 AZURE_SERVICEBUS_HOSTNAME 환경 변수에서 제공됩니다. 그리고 자격 증명은 azidentity.NewDefaultAzureCredential 함수를 사용하여 만들어집니다.

로컬 개발의 경우 DefaultAzureCredential은(는) Azure CLI에서 액세스 토큰을 사용했으며, Azure에 인증하는 az login 명령을 실행하여 만들 수 있습니다.

연결 문자열로 인증하려면 NewClientFromConnectionString 함수를 사용합니다.

큐에 메시지 보내기

main.go 파일에서 SendMessage라는 새 함수를 만들고 다음 코드를 추가합니다.

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage는 메시지 문자열과 azservicebus.Client 개체라는 두 개의 매개 변수를 사용합니다. 그런 다음 새 azservicebus.Sender 개체를 만들고 큐에 메시지를 보냅니다. 대량 메시지를 보내려면 main.go 파일에 SendMessageBatch 함수를 추가합니다.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch는 메시지 조각과 azservicebus.Client 개체라는 두 개의 매개 변수를 사용합니다. 그런 다음 새 azservicebus.Sender 개체를 만들고 큐에 메시지를 보냅니다.

큐에서 메시지 받기

큐에 메시지를 보낸 후에는 azservicebus.Receiver 형식으로 메시지를 받을 수 있습니다. 큐에서 메시지를 받으려면 main.go 파일에 GetMessage 함수를 추가합니다.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessageazservicebus.Client 개체를 가져와서 새 azservicebus.Receiver 개체를 만듭니다. 그런 다음 큐에서 메시지를 받습니다. Receiver.ReceiveMessages 함수는 컨텍스트와 수신할 메시지 수라는 두 가지 매개 변수를 사용합니다. Receiver.ReceiveMessages 함수는 azservicebus.ReceivedMessage 개체 조각을 반환합니다.

다음으로 for 루프는 메시지를 반복하고 메시지 본문을 인쇄합니다. 그런 다음 메시지를 완료하기 위해 CompleteMessage 함수가 호출되어 큐에서 제거됩니다.

길이 제한을 초과하거나 잘못된 큐로 전송되거나 성공적으로 처리되지 않은 메시지는 배달 못한 편지 큐로 보낼 수 있습니다. 배달 못한 편지 큐에 메시지를 보내려면 main.go 파일에 SendDeadLetterMessage 함수를 추가합니다.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessageazservicebus.Client 개체와 azservicebus.ReceivedMessage 개체를 가져옵니다. 그런 다음 배달 못한 편지 큐에 메시지를 보냅니다. 이 함수는 컨텍스트와 azservicebus.DeadLetterOptions 개체라는 두 가지 매개 변수를 사용합니다. 메시지를 배달 못한 편지 큐로 보내지 못하면 Receiver.DeadLetterMessage 함수는 오류를 반환합니다.

배달 못한 편지 큐에서 메시지를 받으려면 main.go 파일에 ReceiveDeadLetterMessage 함수를 추가합니다.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessageazservicebus.Client 개체를 사용하고 배달 못한 편지 큐에 대한 옵션을 사용하여 새 azservicebus.Receiver 개체를 만듭니다. 그런 다음 배달 못한 편지 큐에서 메시지를 받습니다. 그러면 함수는 배달 못한 편지 큐에서 메시지를 받습니다. 그런 다음, 해당 메시지에 대한 배달 못한 편지 이유와 설명을 출력합니다.

샘플 코드

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

코드 실행

코드를 실행하기 전에 AZURE_SERVICEBUS_HOSTNAME이라는 환경 변수를 만듭니다. 환경 변수의 값을 Service Bus 네임스페이스로 설정합니다.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

다음으로 다음 go run 명령을 실행하여 앱을 실행합니다.

go run main.go

다음 단계

자세한 내용은 다음 링크를 확인하세요.