Share via


SQL Server Service Broker Demo

SQL server Service Broker (下面简称SSB) 是SQL server 里面比较独特的一个功能。它可帮助开发人员构建异步的松散耦合应用程序。SSB的一些功能和好处包括有:

  • 数据库集成提高了应用程序的性能并简化了管理。
  • 适用于简化的应用程序开发的消息排序和协调。
  • 应用程序松耦合提供了工作负荷灵活性。
  • 相关消息锁定使一个应用程序的多个实例可以对同一队列中的消息不必显式同步处理。
  • 自动激活使应用程序可以随消息量进行调整。

我认为SSB 的关键特点是:一是异步的。二是可靠的传送。三是保证消息的顺序。学习SSB的时候需要明白它的一些概念,下面简要介绍如下:

MESSAGE TYPE 消息类型。 用于定义会话中交换的数据。

CONTRACT约定。

用于定义任务。每个约定都会指定可在特定会话中使用的消息类型,以及会话的哪端可发送消息。

QUEUE 队列。

用于存储服务的传入消息。

SERVICE 服务

表示一组相关的业务任务。服务的名称还用于查找服务的队列。

Dialog Conversations 对话

对话是两个服务之间的会话。对话是两个服务之间可靠而持久的双向消息流。

Conversations group对话组

标识一组相关的会话。

一个约定依赖于一个或多个消息类型。一个服务依赖于一个队列,并可依赖于一个或多个约定

Service Broker 发送的所有消息都是会话的一部分。对话提供一次顺序 (EOIO) 消息传递功能。对话使用会话标识符和包含在每条消息中的序列号来标识相关的消息,并以正确的顺序传递这些消息。对话是两个服务之间可靠的、持久性消息流。对话会话有两个参与者。“发起方”发起会话。“目标”接受发起方发起的会话。

有了上面的一些基本概念后,可以参考下图理解一个发生在一个database 内的SSB对话过程。

上图表示了左方发起一个会话并发送一个消息的过程。该过程简要描述如下:

  1. 左方发起会话请求
  2. 发送一个消息
  3. 消息被服务传送到transmission queue
  4. 右方服务收到消息,放到target queue
  5. 右方从target queue 取出消息并处理

上面的发送和接收是异步的过程。我下面的TSQL script 演示了具体的各个步骤。 总共5个步骤。你可以逐个步骤执行,观看结果。

/*****************************************************************************************************************/

--STEP 1

--Activate Service Broker message delivery in AdventureWorks.

--Remove any existing objects for the sample

/*****************************************************************************************************************/

 

use master ;

GO

IF NOT EXISTS

  (SELECT * FROM sys.databases

   WHERE name = 'AdventureWorks'

   AND is_broker_enabled = 1)

BEGIN

  ALTER DATABASE AdventureWorks SET ENABLE_BROKER ;

END ;

GO

---------------------------------------------------------------------

USE AdventureWorks ;

GO

--------------------------------------------------------------------

-- Remove any existing objects for the sample.

--------------------------------------------------------------------

-- Notice that the order for dropping objects is different

-- than the order for creating objects. Because contracts depend

-- on message types, a contract must be created after the

-- message types that contract uses, and dropped before the

-- message types. A service is generally created after the contracts

-- Because services depend on contracts and queues,

-- the script drops services before dropping contracts

-- or queues.

-- Drop InitiatorService if the service exists.

IF EXISTS (SELECT *

           FROM sys.services

           WHERE name = 'InitiatorService')

BEGIN

    DROP SERVICE InitiatorService ;

END ;

GO

-- Drop TargetService if the service exists.

IF EXISTS (SELECT *

           FROM sys.services

           WHERE name = 'TargetService')

BEGIN

    DROP SERVICE TargetService ;

END ;

GO

 

-- Because contracts depend on message types, the script

-- drops contracts before dropping message types.

 

-- Drop HelloWorldContract if the contract exists.

IF EXISTS (SELECT *

           FROM sys.service_contracts

           WHERE name = 'HelloWorldContract')

BEGIN

    DROP CONTRACT HelloWorldContract ;

END ;

GO

 

-- Drop HelloWorldMessage if the message type exists.

IF EXISTS (SELECT *

           FROM sys.service_message_types

           WHERE name = 'HelloWorldMessage')

BEGIN

    DROP MESSAGE TYPE HelloWorldMessage ;

END ;

GO

-- Drop InitiatorQueue if the queue exists.

IF OBJECT_ID('[dbo].[InitiatorQueue]') IS NOT NULL AND

   EXISTS(SELECT *

          FROM sys.objects

          WHERE object_id = OBJECT_ID('[dbo].[InitiatorQueue]')

            AND type = 'SQ')

BEGIN

    DROP QUEUE [dbo].[InitiatorQueue] ;

END ;

GO

 

-- Drop TargetQueue if the queue exists.

IF OBJECT_ID('[dbo].[TargetQueue]') IS NOT NULL AND

   EXISTS(SELECT *

          FROM sys.objects

          WHERE object_id = OBJECT_ID('[dbo].[TargetQueue]')

            AND type = 'SQ')

BEGIN

    DROP QUEUE [dbo].[TargetQueue] ;

END ;

GO

/*****************************************************************************************************************/

--STEP 2

-- Create objects for the sample.

--CREATE MESSAGE TYPE

--CREATE CONTRACT

--CREATE QUEUE

--CREATE SERVICE

/*****************************************************************************************************************/

 

 

-- Create the message type. This message type accepts any

-- well-formed XML document.

CREATE MESSAGE TYPE HelloWorldMessage

    VALIDATION = WELL_FORMED_XML ;

GO

 

-- Create the contract. This contract allows either

-- participant in the conversation to send a

-- HelloWorldMessage.

CREATE CONTRACT HelloWorldContract

    ( HelloWorldMessage SENT BY ANY)

GO

 

-- Create the queue for the target service.

CREATE QUEUE [dbo].[TargetQueue] ;

GO

 

-- Create the queue for the initiator service.

CREATE QUEUE [dbo].[InitiatorQueue] ;

GO

 

-- Create the initiator service. Notice that the

-- create statement for the service only includes

-- the contracts that the service is a target for.

-- In this case, the service only initiates conversations,

-- so the service does not specify any contract in

-- the service definition.

 

CREATE SERVICE InitiatorService

    ON QUEUE [dbo].[InitiatorQueue];

GO

 

 

-- Create the target service. Because this service

-- can be the target for conversations that follow

-- the HelloWorldContract, the service specifies

-- the contract in the service definition.

CREATE SERVICE TargetService

    ON QUEUE [dbo].[TargetQueue]

    (HelloWorldContract);

GO

 

 /*****************************************************************************************************************/

--STEP 3

-- SendMessage script for Service Broker

/*****************************************************************************************************************/

USE AdventureWorks ;

GO

 

-- Begin a transaction.

BEGIN TRANSACTION ;

    -- Create the message.

    DECLARE @message XML ;

    SET @message = N'<message>Hello, World! Hello Target!</message>' ;

    -- Declare a variable to hold the conversation

    -- handle.

    DECLARE @conversationHandle UNIQUEIDENTIFIER ;

    -- Begin the dialog.

    BEGIN DIALOG CONVERSATION @conversationHandle

        FROM SERVICE InitiatorService

        TO SERVICE 'TargetService'

        ON CONTRACT HelloWorldContract

        WITH ENCRYPTION = OFF;

 

    -- Send the message on the dialog.

    SEND ON CONVERSATION @conversationHandle

      MESSAGE TYPE HelloWorldMessage

      (@message) ;

 

-- Commit the transaction. Service Broker

-- sends the message to the destination

-- service only when the transaction commits.

COMMIT TRANSACTION ;

GO

 

-- Show the state of the initiator dialog endpoint.

SELECT   conversation_handle, is_initiator, s.name 'local_service',

    far_service, sc.name 'contract', state_desc

FROM sys.conversation_endpoints ce

    LEFT JOIN sys.services s

        ON ce.service_id = s.service_id

    LEFT JOIN sys.service_contracts sc

        ON ce.service_contract_id = sc.service_contract_id

WHERE is_initiator = 1 ;

 

-- Show the transmission_queue. If the message has been

-- delivered to the target service, this should be empty.

-- If the target service could not be reached, the message

-- will remain pending in the transmission queue.

SELECT conversation_handle, to_service_name,

    message_type_name, transmission_status

FROM sys.transmission_queue ;

GO

 

 /*****************************************************************************************************************/

--STEP 4

-- RecievesMessage script for Service Broker

/*****************************************************************************************************************/

 

USE AdventureWorks ;

GO

-- Show the state of the target endpoint if it has been

-- successfully created. The target endpoint is created when the

-- first message is delivered to the target service and enqueued

-- into the target queue.

SELECT conversation_handle, is_initiator, s.name 'local_service',

    far_service, sc.name 'contract', state_desc

FROM sys.conversation_endpoints ce

    LEFT JOIN sys.services s

        ON ce.service_id = s.service_id

    LEFT JOIN sys.service_contracts sc

        ON ce.service_contract_id = sc.service_contract_id

WHERE is_initiator = 0 ;

 

-- Show the messages in the target queue. Notice that target queue

-- contains a message of type HelloWorldMessage.

SELECT * FROM [dbo].[TargetQueue]

GO

 

-- Process all conversation groups.

WHILE (1 = 1)

BEGIN

 

DECLARE @conversation_handle UNIQUEIDENTIFIER,

        @conversation_group_id  UNIQUEIDENTIFIER,

        @message_body XML,

        @message_type_name NVARCHAR(128);

 

BEGIN TRANSACTION ;

 

-- Get next conversation group.

WAITFOR(

   GET CONVERSATION GROUP @conversation_group_id FROM [dbo].[TargetQueue]),

   TIMEOUT 500 ;

 

-- If there are no more conversation groups, roll back the

-- transaction and break out of the outermost WHILE loop.

IF @conversation_group_id IS NULL

BEGIN

    ROLLBACK TRANSACTION ;

    BREAK ;

END ;

 

    -- Process all messages in the conversation group. Notice

    -- that all processing occurs in the same transaction.

    WHILE 1 = 1

    BEGIN

 

        -- Receive the next message for the conversation group.

        -- Notice that the receive statement includes a WHERE

        -- clause to ensure that the messages recieved belong to

        -- the same conversation group.

        RECEIVE

           TOP(1)

           @conversation_handle = conversation_handle,

           @message_type_name = message_type_name,

           @message_body =

           CASE

              WHEN validation = 'X' THEN CAST(message_body AS XML)

              ELSE CAST(N'<none/>' AS XML)

          END

        FROM [dbo].[TargetQueue]

        WHERE conversation_group_id = @conversation_group_id ;

 

        -- If there are no more messages, or an error occured,

        -- stop processing this conversation group.

 

        IF @@ROWCOUNT = 0 OR @@ERROR <> 0 BREAK;

 

        -- Show the information received.

 

        SELECT 'Conversation Group Id' = @conversation_group_id,

               'Conversation Handle' = @conversation_handle,

               'Message Type Name' = @message_type_name,

               'Message Body' = @message_body 

               --5A366B11-A1AF-E111-8FD7-D4BED98EB542

   

    --Responds to Inititator

    DECLARE @message XML ;

    SET @message = N'<message>Hello, World! Hello Initiator!</message>' ;

    SEND ON CONVERSATION @conversation_handle

    MESSAGE TYPE HelloWorldMessage

      (@message) ;   

          -- End the conversation.

        END CONVERSATION @conversation_handle ;

    END; -- Process all messages in conversation group.

   -- Commit the receive statements and the end conversation statement.

   COMMIT TRANSACTION ;

 

END ; -- Process all conversation groups.

GO

 

--Show information from [TargetQueue], it should be empty now.

SELECT  * FROM [TargetQueue]

GO

/*****************************************************************************************************************/

--STEP 5

-- Initiator responds to target to close the conversation

/*****************************************************************************************************************/

 

--Show information from [dbo].[InitiatorQueue]

SELECT  * FROM [dbo].[InitiatorQueue]

GO

-- Begin a new transaction.

BEGIN TRANSACTION ;

    DECLARE @conversation_handle UNIQUEIDENTIFIER,

            @conversation_group_id UNIQUEIDENTIFIER,

            @message_body XML,

            @message_type_name NVARCHAR(128);

    -- Wait for a reply message for 5 seconds.

    WAITFOR (

        RECEIVE

           TOP(1)

           @conversation_handle = conversation_handle,        

           @conversation_group_id = conversation_group_id,

           @message_type_name = message_type_name,

           @message_body =

           CASE

              WHEN validation = 'X' THEN CAST(message_body AS XML)

              ELSE CAST(N'<none/>' AS XML)

          END

        FROM [dbo].[InitiatorQueue]

    ), TIMEOUT 5000;

    IF @@rowcount <> 0

    BEGIN

        -- Show the information received.

        SELECT 'Conversation Group Id' = @conversation_group_id,

               'Conversation Handle' = @conversation_handle,

               'Message Type Name' = @message_type_name,

               'Message Body' = @message_body ;

 

        -- If the message_type_name indicates that the message is an error

        -- or an end dialog message, end the conversation.

        IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'

           OR @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'

        BEGIN

           END CONVERSATION @conversation_handle ;

        END ;

    END

-- Commit the transaction.

COMMIT TRANSACTION ;

GO