WCF チャネル モデルを使用してSQL Serverからポーリング ベースのデータ変更メッセージを受信する
SQL アダプターを構成して、SQL Serverテーブルまたはビューの定期的なデータ変更メッセージを受信できます。 データベースをポーリングするためにアダプターが実行するポーリング ステートメントを指定できます。 ポーリング ステートメントには、結果セットを返す SELECT ステートメントまたはストアド プロシージャを指定できます。
アダプターがポーリングをサポートする方法の詳細については、「 ポーリングを使用した受信呼び出しのサポート」を参照してください。
重要
1 つのアプリケーションで複数のポーリング操作を行う場合は、接続 URI の一部として InboundID 接続プロパティを指定して一意にする必要があります。 指定した受信 ID が操作名前空間に追加され、一意になります。
このトピックでポーリングを示す方法
このトピックでは、SQL アダプターがデータ変更メッセージの受信をサポートする方法を示すために、 ポーリング 操作用の .NET アプリケーションを作成します。 このトピックでは、 PolledDataAvailableStatement を次のように指定します。
SELECT COUNT(*) FROM Employee
PolledDataAvailableStatement は、正の値を含む最初のセルを含む結果セットを返す必要があります。 最初のセルに正の値が含まれていない場合、アダプターはポーリング ステートメントを実行しません。
ポーリング ステートメントの一部として、次の操作を実行します。
Employee テーブルからすべての行を選択します。
ストアド プロシージャ (MOVE_EMP_DATA) を実行して、すべてのレコードを Employee テーブルから EmployeeHistory テーブルに移動します。
ストアド プロシージャ (ADD_EMP_DETAILS) を実行して、Employee テーブルに新しいレコードを追加します。 この手順では、従業員名、指定、給与をパラメーターとして受け取ります。
これらの操作を実行するには、 PollingStatement バインド プロパティに次を指定する必要があります。
SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000
ポーリング ステートメントが実行されると、Employee テーブルのすべてのレコードが選択され、SQL Serverからのメッセージが受信されます。 MOVE_EMP_DATA ストアド プロシージャがアダプターによって実行されると、すべてのレコードが EmployeeHistory テーブルに移動されます。 次に、ADD_EMP_DETAILS ストアド プロシージャが実行され、Employee テーブルに新しいレコードが追加されます。 次のポーリング実行では、1 つのレコードのみが返されます。 このサイクルは、チャネル リスナーを閉じるまで続行されます。
SQL アダプター バインド プロパティを使用したポーリング クエリの構成
次の表は、データ変更メッセージを受信するようにアダプターを構成するために使用する SQL アダプター バインド プロパティをまとめたものです。 これらのバインド プロパティは、ポーリング用の .NET アプリケーションの一部として指定する必要があります。
Binding プロパティ | 説明 |
---|---|
InboundOperationType | ポーリング、TypedPolling、または通知の受信操作を実行するかどうかを指定します。 既定値は ポーリングです。 |
PolledDataAvailableStatement | ポーリングに使用できるデータがあるかどうかを判断するためにアダプターが実行する SQL ステートメントを指定します。 SQL ステートメントは、行と列で構成される結果セットを返す必要があります。 行が使用可能な場合にのみ、 PollingStatement バインド プロパティに指定された SQL ステートメントが実行されます。 |
PollingIntervalInSeconds | SQL アダプターが PolledDataAvailableStatement バインディング プロパティに指定されたステートメントを実行する間隔を秒単位で指定します。 既定値は 30 秒です。 ポーリング間隔は、連続するポーリング間の時間間隔を決定します。 ステートメントが指定された期間内に実行された場合、アダプターはその間隔の残りの時間を待機します。 |
PollingStatement | SQL Server データベース テーブルをポーリングする SQL ステートメントを指定します。 ポーリング ステートメントには、単純な SELECT ステートメントまたはストアド プロシージャを指定できます。 既定値は null です。 ポーリングを有効にするには、 PollingStatement の値を指定する必要があります。 ポーリング ステートメントは、ポーリングに使用できるデータがある場合にのみ実行されます。これは、 PolledDataAvailableStatement バインド プロパティによって決定されます。 セミコロンで区切られた任意の数の SQL ステートメントを指定できます。 |
PollWhileDataFound | ポーリング対象のテーブルでデータが使用可能な場合に、SQL アダプターがポーリング間隔を無視し、 PolledDataAvailableStatement バインディング プロパティに指定された SQL ステートメントを継続的に実行するかどうかを指定します。 テーブルに使用可能なデータがない場合、アダプターは、指定されたポーリング間隔で SQL ステートメントを実行するように戻します。 既定値は falseです。 |
これらのプロパティの詳細については、「BizTalk Adapter for SQL Server アダプター バインド プロパティ」を参照してください。 SQL アダプターを使用してSQL Serverポーリングする方法の詳細については、このトピックの残りの部分を参照してください。
ポーリング要求メッセージの使用
アダプターは、コードに対してポーリング操作を呼び出して、SQL Server データベースをポーリングします。 つまり、アダプターは、IInputChannel チャネル図形を介して受信したポーリング要求メッセージを送信します。 ポーリング要求メッセージには、PollingStatement バインド プロパティで指定されたクエリの結果セットが含まれます。 ポーリング メッセージは、次の 2 つの方法のいずれかで使用できます。
ノード値ストリーミングを使用してメッセージを使用するには、応答メッセージで WriteBodyContents メソッドを呼び出し、ノード値ストリーミングを実装する XmlDictionaryWriter を渡す必要があります。
ノード ストリーミングを使用してメッセージを使用するには、応答メッセージで GetReaderAtBodyContents を呼び出して XmlReader を取得します。
このトピックで使用される例について
このトピックの例では、Employee テーブルをポーリングします。 この例では、MOVE_EMP_DATA ストアド プロシージャとストアド プロシージャADD_EMP_DETAILSも使用します。 これらの成果物を生成するスクリプトは、サンプルと共に提供されます。 サンプルの詳細については、「 SQL アダプターのサンプル」を参照してください。 このトピックに基づくサンプル Polling_ChannelModelは、SQL アダプターのサンプルでも提供されています。
WCF チャネル モデルを使用したポーリング操作の受信メッセージの受信
このセクションでは、SQL アダプターを使用して受信ポーリング メッセージを受信する .NET アプリケーション (チャネル モデル) を記述する方法について説明します。
SQL アダプターからポーリング メッセージを受信するには
Visual Studio で Microsoft Visual C# プロジェクトを作成します。 このトピックでは、コンソール アプリケーションを作成します。
ソリューション エクスプローラーで、および への参照を
Microsoft.Adapters.Sql
Microsoft.ServiceModel.Channels
System.ServiceModel
追加します。System.Runtime.Serialization
Program.cs ファイルを開き、次の名前空間を追加します。
Microsoft.Adapters.Sql
System.ServiceModel
System.ServiceModel.Description
System.ServiceModel.Channels
System.Xml
接続 URI を指定します。 アダプター接続 URI の詳細については、「SQL Server接続 URI を作成する」を参照してください。
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
SqlAdapterBinding のインスタンスを作成し、ポーリングを構成するために必要なバインド プロパティを設定します。 少なくとも、 InboundOperationType、 PolledDataAvailableStatement、および PollingStatement バインディング プロパティを設定する必要があります。 ポーリングの構成に使用されるバインディング プロパティの詳細については、「ポーリングを 使用した受信呼び出しのサポート」を参照してください。
SqlAdapterBinding binding = new SqlAdapterBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE"; binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
バインド パラメーター コレクションを作成し、資格情報を設定します。
ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "<Enter user name here>"; credentials.UserName.Password = "<Enter password here>"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
チャネル リスナーを作成して開きます。 リスナーを作成する場合は、SqlAdapterBinding で BuildChannelListener<IInputChannel> メソッドを呼び出します。
IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
リスナーで AcceptChannel メソッドを呼び出して IInputChannel チャネルを取得し、それを開きます。
IInputChannel channel = listener.AcceptChannel(); channel.Open();
チャネルで Receive を呼び出して、アダプターから次の POLLINGSTMT メッセージを取得します。
Message message = channel.Receive();
POLLINGSTMT 操作によって返される結果セットを使用します。 メッセージは、 XmlReader または XmlDictionaryWriter を使用して使用できます。
XmlReader reader = message.GetReaderAtBodyContents();
要求の処理が完了したら、チャネルを閉じます。
channel.Close()
重要
POLLINGSTMT 操作の処理が完了したら、チャネルを閉じる必要があります。 チャネルを閉じに失敗すると、コードの動作が影響を受ける可能性があります。
データ変更メッセージの受信が完了したら、リスナーを閉じます。
listener.Close()
重要
リスナーを閉じても、リスナーを使用して作成されたチャネルは閉じられません。 リスナーを使用して作成された各チャネルを明示的に閉じる必要があります。
例
次の例は、Employee テーブルを実行するポーリング クエリを示しています。 ポーリング ステートメントは、次のタスクを実行します。
Employee テーブルからすべてのレコードを選択します。
MOVE_EMP_DATA ストアド プロシージャを実行して、すべてのレコードを Employee テーブルから EmployeeHistory テーブルに移動します。
ADD_EMP_DETAILS ストアド プロシージャを実行して、1 つのレコードを Employee テーブルに追加します。
ポーリング メッセージは に
C:\PollingOutput.xml
保存されます。
using System;
using Microsoft.Adapters.Sql;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.ServiceModel.Channels;
using System.Xml;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Sample started. This sample will poll 5 times and will perform the following tasks:");
Console.WriteLine("Press any key to start polling...");
Console.ReadLine();
IChannelListener<IInputChannel> listener = null;
IInputChannel channel = null;
try
{
TimeSpan messageTimeout = new TimeSpan(0, 0, 30);
SqlAdapterBinding binding = new SqlAdapterBinding();
binding.InboundOperationType = InboundOperation.Polling;
binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";
binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
ClientCredentials credentials = new ClientCredentials();
credentials.UserName.UserName = "<Enter user name here>";
credentials.UserName.Password = "<Enter password here>";
BindingParameterCollection bindingParams = new BindingParameterCollection();
bindingParams.Add(credentials);
listener = binding.BuildChannelListener<IInputChannel>(ConnectionUri, bindingParams);
listener.Open();
channel = listener.AcceptChannel();
channel.Open();
Console.WriteLine("Channel and Listener opened...");
Console.WriteLine("\nWaiting for polled data...");
Console.WriteLine("Receive request timeout is {0}", messageTimeout);
// Poll five times with the specified message timeout
// If a timeout occurs polling will be aborted
for (int i = 0; i < 5; i++)
{
Console.WriteLine("Polling: " + i);
Message message = null;
XmlReader reader = null;
try
{
//Message is received so process the results
message = channel.Receive(messageTimeout);
}
catch (System.TimeoutException toEx)
{
Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);
continue;
}
// Get the query results using an XML reader
try
{
reader = message.GetReaderAtBodyContents();
}
catch (Exception ex)
{
Console.WriteLine("Exception :" + ex);
throw;
}
XmlDocument doc = new XmlDocument();
doc.Load(reader);
using (XmlWriter writer = XmlWriter.Create("C:\\PollingOutput.xml"))
{
doc.WriteTo(writer);
Console.WriteLine("The polling response is saved at 'C:\\PollingOutput.xml'");
}
// return the cursor
Console.WriteLine();
// close the reader
reader.Close();
message.Close();
}
Console.WriteLine("\nPolling done -- hit <RETURN> to finish");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine("Exception is: " + ex.Message);
if (ex.InnerException != null)
{
Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);
}
}
finally
{
// IMPORTANT: close the channel and listener to stop polling
if (channel != null)
{
if (channel.State == CommunicationState.Opened)
channel.Close();
else
channel.Abort();
}
if (listener != null)
{
if (listener.State == CommunicationState.Opened)
listener.Close();
else
listener.Abort();
}
}
}
}
}