Développement d'un composant de destination personnalisé
S’applique à : SQL Server SSIS Integration Runtime dans Azure Data Factory
Microsoft SQL Server Integration Services offre aux développeurs la capacité d’écrire des composants de destination personnalisés qui peuvent se connecter à n’importe quelle source de données personnalisée et y stocker des données. Les composants de destination personnalisés sont utiles lorsque vous devez vous connecter à des sources de données qui ne sont pas accessibles via l'un des composants sources existants inclus dans Integration Services.
Les composants de destination possèdent une ou plusieurs entrées et zéro sortie. Au moment de la conception, ils créent et configurent des connexions et lisent les métadonnées des colonnes à partir de la source de données externe. Pendant l'exécution, ils se connectent à leur source de données externe et y ajoutent des lignes provenant de composants situés en amont du flux de données. Si la source de données externe existe avant l'exécution du composant, le composant de destination doit également s'assurer que les types de données des colonnes que le composant reçoit correspondent aux types de données des colonnes au niveau de la source de données externe.
Cette section explique en détail comment développer des composants de destination et fournit des exemples de code pour clarifier des concepts importants. Pour une vue d’ensemble du développement de composants de flux de données, consultez Développement d’un composant de flux de données personnalisé.
Moment de la conception
Pour implémenter les fonctionnalités au moment de la conception d'un composant de destination, vous devez spécifier une connexion à une source de données externe et confirmer que le composant a été correctement configuré. Par définition, un composant de destination possède une entrée et éventuellement une sortie d'erreur.
Création du composant
Les composants de destination se connectent à des sources de données externes à l'aide d'objets ConnectionManager définis dans un package. Le composant de destination indique qu’il a besoin d’un gestionnaire de connexions au concepteur SSIS et aux utilisateurs du composant, en ajoutant un élément à la collection RuntimeConnectionCollection de ComponentMetaData. Cette collection remplit deux rôles : d'abord, elle publie le besoin d'un gestionnaire de connexions auprès du concepteur SSIS ; puis, lorsque l'utilisateur a sélectionné ou créé un gestionnaire de connexions, elle contient une référence au gestionnaire de connexions dans le package utilisé par le composant. Lorsqu’un objet IDTSRuntimeConnection100 est ajouté à la collection, l’Éditeur avancé affiche l’onglet Propriétés de connexion pour inviter l’utilisateur à sélectionner ou créer une connexion dans le package utilisée par le composant.
L'exemple de code suivant affiche une implémentation de ProvideComponentProperties qui ajoute une entrée, puis ajoute un objet IDTSRuntimeConnection100 à RuntimeConnectionCollection.
using System;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
namespace Microsoft.Samples.SqlServer.Dts
{
[DtsPipelineComponent(DisplayName = "Destination Component",ComponentType =ComponentType.DestinationAdapter)]
public class DestinationComponent : PipelineComponent
{
public override void ProvideComponentProperties()
{
// Reset the component.
base.RemoveAllInputsOutputsAndCustomProperties();
ComponentMetaData.RuntimeConnectionCollection.RemoveAll();
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "Input";
IDTSRuntimeConnection100 connection = ComponentMetaData.RuntimeConnectionCollection.New();
connection.Name = "ADO.net";
}
}
}
Imports System
Imports System.Data
Imports System.Data.SqlClient
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime
Namespace Microsoft.Samples.SqlServer.Dts
<DtsPipelineComponent(DisplayName:="Destination Component", ComponentType:=ComponentType.DestinationAdapter)> _
Public Class DestinationComponent
Inherits PipelineComponent
Public Overrides Sub ProvideComponentProperties()
' Reset the component.
Me.RemoveAllInputsOutputsAndCustomProperties()
ComponentMetaData.RuntimeConnectionCollection.RemoveAll()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
input.Name = "Input"
Dim connection As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New()
connection.Name = "ADO.net"
End Sub
End Class
End Namespace
Connexion à une source de données externe
Une fois la connexion ajoutée à la collection RuntimeConnectionCollection, vous pouvez remplacer la méthode AcquireConnections pour établir une connexion à la source de données externe. Cette méthode est appelée au moment de la conception et au moment de l'exécution. Le composant doit établir une connexion au gestionnaire de connexions spécifié par la connexion au moment de l'exécution, et par la suite, à la source de données externe. Une fois la connexion établie, le composant doit la mettre en cache en interne et la libérer lorsque ReleaseConnections est appelé. Les développeurs substituent cette méthode et libèrent la connexion établie par le composant pendant l'exécution de la méthode AcquireConnections. Les méthodes ReleaseConnections et AcquireConnections sont appelées au moment de la conception et au moment de l'exécution.
L'exemple de code suivant montre un composant qui établit une connexion ADO.NET dans la méthode AcquireConnections, puis ferme la connexion dans ReleaseConnections.
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
private SqlConnection sqlConnection;
public override void AcquireConnections(object transaction)
{
if (ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager != null)
{
ConnectionManager cm = Microsoft.SqlServer.Dts.Runtime.DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection[0].ConnectionManager);
ConnectionManagerAdoNet cmado = cm.InnerObject as ConnectionManagerAdoNet;
if (cmado == null)
throw new Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.");
sqlConnection = cmado.AcquireConnection(transaction) as SqlConnection;
sqlConnection.Open();
}
}
public override void ReleaseConnections()
{
if (sqlConnection != null && sqlConnection.State != ConnectionState.Closed)
sqlConnection.Close();
}
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper
Private sqlConnection As SqlConnection
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
If IsNothing(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager) = False Then
Dim cm As ConnectionManager = DtsConvert.GetWrapper(ComponentMetaData.RuntimeConnectionCollection(0).ConnectionManager)
Dim cmado As ConnectionManagerAdoNet = CType(cm.InnerObject,ConnectionManagerAdoNet)
If IsNothing(cmado) Then
Throw New Exception("The ConnectionManager " + cm.Name + " is not an ADO.NET connection.")
End If
sqlConnection = CType(cmado.AcquireConnection(transaction), SqlConnection)
sqlConnection.Open()
End If
End Sub
Public Overrides Sub ReleaseConnections()
If IsNothing(sqlConnection) = False And sqlConnection.State <> ConnectionState.Closed Then
sqlConnection.Close()
End If
End Sub
Validation du composant
Les développeurs de composants de destination doivent procéder à une opération de validation décrite dans Validation de composants. De plus, ils doivent vérifier que les propriétés de type de données des colonnes définies dans la collection de colonnes d'entrée du composant correspondent aux colonnes au niveau de la source de données externe. Il est parfois impossible ou déconseillé de vérifier les colonnes d'entrée par rapport à la source de données externe, notamment lorsque le composant ou le concepteur SSIS est déconnecté, ou lorsque les allers-retours au serveur ne sont pas acceptables. Dans ce cas, il est toujours possible de valider les colonnes dans la collection de colonnes d'entrée à l'aide de la propriété ExternalMetadataColumnCollection de l'objet d'entrée.
Cette collection existe sur les objets d'entrée et de sortie et doit être remplie par le développeur de composants à partir des colonnes au niveau de la source de données externe. Cette collection permet de valider les colonnes d’entrée lorsque le concepteur SSIS est hors connexion, lorsque le composant est déconnecté ou lorsque la propriété ValidateExternalMetadata a la valeur false.
L'exemple de code suivant ajoute une colonne de métadonnées externe basée sur une colonne d'entrée existante.
private void AddExternalMetaDataColumn(IDTSInput100 input,IDTSInputColumn100 inputColumn)
{
// Set the properties of the external metadata column.
IDTSExternalMetadataColumn100 externalColumn = input.ExternalMetadataColumnCollection.New();
externalColumn.Name = inputColumn.Name;
externalColumn.Precision = inputColumn.Precision;
externalColumn.Length = inputColumn.Length;
externalColumn.DataType = inputColumn.DataType;
externalColumn.Scale = inputColumn.Scale;
// Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID;
}
Private Sub AddExternalMetaDataColumn(ByVal input As IDTSInput100, ByVal inputColumn As IDTSInputColumn100)
' Set the properties of the external metadata column.
Dim externalColumn As IDTSExternalMetadataColumn100 = input.ExternalMetadataColumnCollection.New()
externalColumn.Name = inputColumn.Name
externalColumn.Precision = inputColumn.Precision
externalColumn.Length = inputColumn.Length
externalColumn.DataType = inputColumn.DataType
externalColumn.Scale = inputColumn.Scale
' Map the external column to the input column.
inputColumn.ExternalMetadataColumnID = externalColumn.ID
End Sub
Moment de l'exécution
Pendant l'exécution, le composant de destination reçoit un appel à la méthode ProcessInput chaque fois qu'un PipelineBuffer saturé est disponible à partir du composant en amont. Cette méthode est appelée à plusieurs reprises jusqu’à ce qu’il n’y ait plus de mémoire tampon disponible et que la propriété EndOfRowset prenne la valeur true. Pendant l'exécution de cette méthode, les composants de destination lisent les colonnes et les lignes dans la mémoire tampon et les ajoutent à la source de données externe.
Localisation des colonnes dans le tampon
La mémoire tampon d'entrée d'un composant contient toutes les colonnes définies dans les collections de colonnes de sortie des composants situés en amont du composant dans le flux de données. Par exemple, si un composant source fournit trois colonnes dans sa sortie, et que le composant suivant ajoute une colonne de sortie supplémentaire, la mémoire tampon fournie au composant de destination contient quatre colonnes, même si le composant de destination n'écrit que deux colonnes.
L'ordre des colonnes dans la mémoire tampon d'entrée n'est pas défini par l'index de la colonne dans la collection de colonnes d'entrée du composant de destination. Des colonnes peuvent être placées de manière fiable dans une ligne de la mémoire tampon à l'aide de la méthode FindColumnByLineageID de BufferManager. Cette méthode recherche la colonne avec l'ID de lignage spécifié dans la mémoire tampon spécifiée et retourne son emplacement dans la ligne. Les index des colonnes d'entrée sont généralement localisés lors de l'exécution de la méthode PreExecute et mis en cache afin d'être utilisés ultérieurement par le développeur lors de l'exécution de la méthode ProcessInput.
L'exemple de code suivant recherche l'emplacement des colonnes d'entrée dans la mémoire tampon pendant l'exécution de la méthode PreExecute et les stocke dans un tableau. Le tableau est ensuite utilisé au cours de l'exécution de la méthode ProcessInput pour lire les valeurs des colonnes dans la mémoire tampon.
int[] cols;
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
cols = new int[input.InputColumnCollection.Count];
for (int x = 0; x < input.InputColumnCollection.Count; x++)
{
cols[x] = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection[x].LineageID);
}
}
Private cols As Integer()
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
ReDim cols(input.InputColumnCollection.Count)
For x As Integer = 0 To input.InputColumnCollection.Count
cols(x) = BufferManager.FindColumnByLineageID(input.Buffer, input.InputColumnCollection(x).LineageID)
Next x
End Sub
Traitement de lignes
Une fois que les colonnes d'entrée ont été localisées dans la mémoire tampon, elles peuvent être lues et écrites dans la source de données externe.
Pendant que le composant de destination écrit des lignes dans la source de données externe, vous pouvez mettre à jour les compteurs de performance « Lignes lues » ou « Octets BLOB lus » en appelant la méthode IncrementPipelinePerfCounter. Pour plus d’informations, consultez Compteurs de performances.
L'exemple suivant présente un composant qui lit des lignes à partir de la mémoire tampon fournie dans ProcessInput. La recherche des index des colonnes dans la mémoire tampon a été effectuée au cours de l'exécution de la méthode PreExecute dans l'exemple de code précédent.
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
foreach (int col in cols)
{
if (!buffer.IsNull(col))
{
// TODO: Read the column data.
}
}
}
}
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While (buffer.NextRow())
For Each col As Integer In cols
If buffer.IsNull(col) = False Then
' TODO: Read the column data.
End If
Next col
End While
End Sub
Exemple
L'exemple suivant montre un composant de destination simple qui utilise un gestionnaire de connexions de fichiers pour enregistrer les données binaires du flux de données dans des fichiers. Cet exemple ne contient pas toutes les méthodes et fonctionnalités présentées dans cette rubrique. Il illustre les méthodes importantes que chaque composant de destination personnalisé doit substituer, mais ne contient pas de code pour la validation au moment de la conception.
using System;
using System.IO;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
namespace BlobDst
{
[DtsPipelineComponent(DisplayName = "BLOB Extractor Destination", Description = "Writes values of BLOB columns to files")]
public class BlobDst : PipelineComponent
{
string m_DestDir;
int m_FileNameColumnIndex = -1;
int m_BlobColumnIndex = -1;
public override void ProvideComponentProperties()
{
IDTSInput100 input = ComponentMetaData.InputCollection.New();
input.Name = "BLOB Extractor Destination Input";
input.HasSideEffects = true;
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection.New();
conn.Name = "FileConnection";
}
public override void AcquireConnections(object transaction)
{
IDTSRuntimeConnection100 conn = ComponentMetaData.RuntimeConnectionCollection[0];
m_DestDir = (string)conn.ConnectionManager.AcquireConnection(null);
if (m_DestDir.Length > 0 && m_DestDir[m_DestDir.Length - 1] != '\\')
m_DestDir += "\\";
}
public override IDTSInputColumn100 SetUsageType(int inputID, IDTSVirtualInput100 virtualInput, int lineageID, DTSUsageType usageType)
{
IDTSInputColumn100 inputColumn = base.SetUsageType(inputID, virtualInput, lineageID, usageType);
IDTSCustomProperty100 custProp;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsFileName";
custProp.Value = (object)false;
custProp = inputColumn.CustomPropertyCollection.New();
custProp.Name = "IsBLOB";
custProp.Value = (object)false;
return inputColumn;
}
public override void PreExecute()
{
IDTSInput100 input = ComponentMetaData.InputCollection[0];
IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;
IDTSCustomProperty100 custProp;
foreach (IDTSInputColumn100 column in inputColumns)
{
custProp = column.CustomPropertyCollection["IsFileName"];
if ((bool)custProp.Value == true)
{
m_FileNameColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
custProp = column.CustomPropertyCollection["IsBLOB"];
if ((bool)custProp.Value == true)
{
m_BlobColumnIndex = (int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID);
}
}
}
public override void ProcessInput(int inputID, PipelineBuffer buffer)
{
while (buffer.NextRow())
{
string strFileName = buffer.GetString(m_FileNameColumnIndex);
int blobLength = (int)buffer.GetBlobLength(m_BlobColumnIndex);
byte[] blobData = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength);
strFileName = TranslateFileName(strFileName);
// Make sure directory exists before creating file.
FileInfo fi = new FileInfo(strFileName);
if (!fi.Directory.Exists)
fi.Directory.Create();
// Write the data to the file.
FileStream fs = new FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None);
fs.Write(blobData, 0, blobLength);
fs.Close();
}
}
private string TranslateFileName(string fileName)
{
if (fileName.Length > 2 && fileName[1] == ':')
return m_DestDir + fileName.Substring(3, fileName.Length - 3);
else
return m_DestDir + fileName;
}
}
}
Imports System
Imports System.IO
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Namespace BlobDst
<DtsPipelineComponent(DisplayName="BLOB Extractor Destination", Description="Writes values of BLOB columns to files")> _
Public Class BlobDst
Inherits PipelineComponent
Private m_DestDir As String
Private m_FileNameColumnIndex As Integer = -1
Private m_BlobColumnIndex As Integer = -1
Public Overrides Sub ProvideComponentProperties()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New
input.Name = "BLOB Extractor Destination Input"
input.HasSideEffects = True
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection.New
conn.Name = "FileConnection"
End Sub
Public Overrides Sub AcquireConnections(ByVal transaction As Object)
Dim conn As IDTSRuntimeConnection100 = ComponentMetaData.RuntimeConnectionCollection(0)
m_DestDir = CType(conn.ConnectionManager.AcquireConnection(Nothing), String)
If m_DestDir.Length > 0 AndAlso Not (m_DestDir(m_DestDir.Length - 1) = "\"C) Then
m_DestDir += "\"
End If
End Sub
Public Overrides Function SetUsageType(ByVal inputID As Integer, ByVal virtualInput As IDTSVirtualInput100, ByVal lineageID As Integer, ByVal usageType As DTSUsageType) As IDTSInputColumn100
Dim inputColumn As IDTSInputColumn100 = MyBase.SetUsageType(inputID, virtualInput, lineageID, usageType)
Dim custProp As IDTSCustomProperty100
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsFileName"
custProp.Value = CType(False, Object)
custProp = inputColumn.CustomPropertyCollection.New
custProp.Name = "IsBLOB"
custProp.Value = CType(False, Object)
Return inputColumn
End Function
Public Overrides Sub PreExecute()
Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection
Dim custProp As IDTSCustomProperty100
For Each column As IDTSInputColumn100 In inputColumns
custProp = column.CustomPropertyCollection("IsFileName")
If CType(custProp.Value, Boolean) = True Then
m_FileNameColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
custProp = column.CustomPropertyCollection("IsBLOB")
If CType(custProp.Value, Boolean) = True Then
m_BlobColumnIndex = CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)
End If
Next
End Sub
Public Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer)
While buffer.NextRow
Dim strFileName As String = buffer.GetString(m_FileNameColumnIndex)
Dim blobLength As Integer = CType(buffer.GetBlobLength(m_BlobColumnIndex), Integer)
Dim blobData As Byte() = buffer.GetBlobData(m_BlobColumnIndex, 0, blobLength)
strFileName = TranslateFileName(strFileName)
Dim fi As FileInfo = New FileInfo(strFileName)
' Make sure directory exists before creating file.
If Not fi.Directory.Exists Then
fi.Directory.Create
End If
' Write the data to the file.
Dim fs As FileStream = New FileStream(strFileName, FileMode.Create, FileAccess.Write, FileShare.None)
fs.Write(blobData, 0, blobLength)
fs.Close
End While
End Sub
Private Function TranslateFileName(ByVal fileName As String) As String
If fileName.Length > 2 AndAlso fileName(1) = ":"C Then
Return m_DestDir + fileName.Substring(3, fileName.Length - 3)
Else
Return m_DestDir + fileName
End If
End Function
End Class
End Namespace
Voir aussi
Développement d’un composant source personnalisé
Création d’une destination à l’aide du composant Script