Поделиться через


Разработка пользовательского компонента преобразования с синхронными выходами

Компоненты преобразования с синхронными выходами получают строки из вышестоящих компонентов и считывают либо изменяют значения в столбцах этих строк по мере передачи строк нижестоящим компонентам. В них также можно определить дополнительные выходные столбцы, производные от столбцов, которые передаются вышестоящими компонентами, однако эти столбцы не добавляют строки в поток данных. Дополнительные сведения о различиях между синхронными и асинхронными выходами см. в разделе Основные сведения о синхронных и асинхронных преобразованиях.

Компоненты такого типа подходят для задач, в которых данные изменяются встроенными средствами при передаче компоненту, а компоненту необязательно просматривать все строки, прежде чем приступать к их обработке. Это наиболее простой для разработки вид компонента, поскольку преобразования с синхронными выходами обычно не соединяются с внешними источниками данных, не управляют столбцами внешних метаданных и не добавляют строки в выходные буферы.

Создание компонента преобразования с синхронными выходами предполагает добавление объекта IDTSInput100, который содержит вышестоящие столбцы, выбранные для компонента, и объекта IDTSOutput100, который может содержать производные столбцы, созданные компонентом. Необходимо также реализовать методы времени разработки и предоставить код, осуществляющий считывание или изменение столбцов во входящих строках буфера в процессе выполнения.

В этом разделе содержатся сведения о том, что требуется для реализации пользовательского компонента преобразования, а также приведены примеры кода, которые помогут лучше понять основные понятия. Образец компонента преобразования с синхронными выходами см. в образцах служб Integration Services в разделе Codeplex.

Время разработки

Код времени разработки для этого компонента включает создание входов и выходов, добавление дополнительных выходных столбцов, формируемых компонентом, и проверку конфигурации компонента.

Создание компонента

Создание входов, выходов и пользовательских свойств компонента обычно осуществляется при выполнении метода ProvideComponentProperties. Существует два способа добавления входа и выхода в компонент преобразования с синхронными выходами. Можно использовать реализацию базового класса метода и затем изменить созданные им вход и выход по умолчанию, либо можно явным образом самостоятельно добавить вход и выход.

В следующем примере кода демонстрируется реализация метода ProvideComponentProperties, явно добавляющего объекты входа и выхода. Вызов базового класса, с помощью которого можно достичь того же результата, заключен в комментарий.

using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;

namespace Microsoft.Samples.SqlServer.Dts
{
    [DtsPipelineComponent(DisplayName = "SynchronousComponent", ComponentType = ComponentType.Transform)]
    public class SyncComponent : PipelineComponent
    {

        public override void ProvideComponentProperties()
        {
            // Add the input.
            IDTSInput100 input = ComponentMetaData.InputCollection.New();
            input.Name = "Input";

            // Add the output.
            IDTSOutput100 output = ComponentMetaData.OutputCollection.New();
            output.Name = "Output";
            output.SynchronousInputID = input.ID;

            // Alternatively, you can let the base class add the input and output
            // and set the SynchronousInputID of the output to the ID of the input.
            // base.ProvideComponentProperties();
        }
    }
}
Imports Microsoft.SqlServer.Dts.Pipeline
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports Microsoft.SqlServer.Dts.Runtime

<DtsPipelineComponent(DisplayName:="SynchronousComponent", ComponentType:=ComponentType.Transform)> _
Public Class SyncComponent
    Inherits PipelineComponent

    Public Overrides Sub ProvideComponentProperties()

        ' Add the input.
        Dim input As IDTSInput100 = ComponentMetaData.InputCollection.New()
        input.Name = "Input"

        ' Add the output.
        Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection.New()
        output.Name = "Output"
        output.SynchronousInputID = Input.ID

        ' Alternatively, you can let the base class add the input and output
        ' and set the SynchronousInputID of the output to the ID of the input.
        ' base.ProvideComponentProperties();

    End Sub

End Class

Создание и настройка выходных столбцов

Хотя компоненты преобразования с синхронными выходами не добавляют строки в буферы, они могут добавлять дополнительные выходные столбцы в собственный выход. Как правило, при добавлении компонентом выходного столбца значения нового выходного столбца производятся во время выполнения от данных, содержащихся в одном или нескольких столбцах, переданных компоненту вышестоящим компонентом.

После создания выходного столбца необходимо задать для него свойства типа данных. Задание свойств типа данных выходного столбца требует специальной обработки и осуществляется путем вызова метода SetDataTypeProperties. Использование этого метода необходимо, поскольку свойства DataType, Length, Precision и CodePage по отдельности доступны только для чтения по причине зависимости каждого из них от настроек других. Этот метод гарантирует согласованность задания значений свойств, а задача потока данных осуществляет проверку правильности задания значений.

Свойство DataType столбца определяет значения, задаваемые для других свойств. В следующей таблице показаны требования к зависимым свойствам для каждого значения DataType. Для не указанных здесь типов данных зависимые свойства имеют нулевые значения.

Тип данных

Длина

Масштаб

Точность

Кодовая страница

DT_DECIMAL

0

Больше 0 и меньше или равно 28.

0

0

DT_CY

0

0

0

0

DT_NUMERIC

0

Больше 0, меньше или равно 28 и меньше, чем точность.

Больше или равно 1 и меньше или равно 38.

0

DT_BYTES

Больше 0.

0

0

0

DT_STR

Больше 0 и меньше 8 000.

0

0

Не равно 0 и представляет допустимую кодовую страницу.

DT_WSTR

Больше 0 и меньше 4 000.

0

0

0

Поскольку ограничения для свойств типа данных основаны на типе данных выходного столбца, во время работы с управляемыми типами необходимо выбрать правильный тип данных служб Integration Services. Базовый класс предоставляет три вспомогательных метода, ConvertBufferDataTypeToFitManaged, BufferTypeToDataRecordType и DataRecordTypeToBufferType, которые упрощают для разработчиков управляемых компонентов выбор типа данных служб SSIS, если необходим управляемый тип. Эти методы преобразуют управляемые типы данных в типы данных служб SSIS и обратно.

Время выполнения

Обычно реализация части компонента, которая относится ко времени выполнения, заключается в решении двух задач – поиске входных и выходных столбцов компонента в буфере и считывании либо записи значений этих столбцов во входящие строки буфера.

Поиск столбцов в буфере

Количество столбцов в буферах, передаваемых компоненту во время выполнения, скорее всего, окажется больше числа столбцов во входных и выходных коллекциях компонента. Так происходит потому, что каждый буфер содержит все выходные столбцы, определенные в компонентах потока данных. Чтобы обеспечить правильное сопоставление столбцов буфера со столбцами входа и выхода, разработчикам компонентов следует использовать метод FindColumnByLineageID диспетчера буферов BufferManager. Этот метод осуществляет поиск столбца в указанном буфере по его идентификатору журнала преобразований. Обычно поиск столбцов производится в процессе выполнения метода PreExecute, поскольку это первый метод времени выполнения, в котором становится доступным свойство BufferManager.

В следующем примере кода демонстрируется компонент, осуществляющий поиск индексов столбцов в своей коллекции входных и выходных столбцов в процессе выполнения метода PreExecute. Индексы столбцов хранятся в целочисленном массиве, и доступ к ним компонент может получить в процессе выполнения метода ProcessInput.

int []inputColumns;
int []outputColumns;

public override void PreExecute()
{
    IDTSInput100 input = ComponentMetaData.InputCollection[0];
    IDTSOutput100 output = ComponentMetaData.OutputCollection[0];

    inputColumns = new int[input.InputColumnCollection.Count];
    outputColumns = new int[output.OutputColumnCollection.Count];

    for(int col=0; col < input.InputColumnCollection.Count; col++)
    {
        IDTSInputColumn100 inputColumn = input.InputColumnCollection[col];
        inputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID);
    }

    for(int col=0; col < output.OutputColumnCollection.Count; col++)
    {
        IDTSOutputColumn100 outputColumn = output.OutputColumnCollection[col];
        outputColumns[col] = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID);
    }

}
Public Overrides Sub PreExecute()

    Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0)
    Dim output As IDTSOutput100 = ComponentMetaData.OutputCollection(0)


    ReDim inputColumns(input.InputColumnCollection.Count)
    ReDim outputColumns(output.OutputColumnCollection.Count)

    For col As Integer = 0 To input.InputColumnCollection.Count

        Dim inputColumn As IDTSInputColumn100 = input.InputColumnCollection(col)
        inputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, inputColumn.LineageID)
    Next

    For col As Integer = 0 To output.OutputColumnCollection.Count

        Dim outputColumn As IDTSOutputColumn100 = output.OutputColumnCollection(col)
        outputColumns(col) = BufferManager.FindColumnByLineageID(input.Buffer, outputColumn.LineageID)
    Next

End Sub

Обработка строк

Компоненты получают объекты PipelineBuffer, содержащие строки и столбцы, в методе ProcessInput. При выполнении этого метода осуществляется итерация по строкам в буфере, а столбцы, идентифицированные во время выполнения метода PreExecute, считываются и изменяются. Метод вызывается задачей потока данных повторно до тех пор, пока не останется строк, переданных из вышестоящего компонента.

Считывание или запись отдельного столбца в буфере осуществляется с использованием индексатора массива в качестве метода доступа либо с помощью одного из методов Get или Set. Методы Get и Set более эффективны, их следует использовать в случаях, когда тип данных столбца в буфере известен.

В следующем примере кода демонстрируется реализация метода ProcessInput, обрабатывающего входящие строки.

public override void ProcessInput( int InputID, PipelineBuffer buffer)
{
       while( buffer.NextRow())
       {
            for(int x=0; x < inputColumns.Length;x++)
            {
                if(!buffer.IsNull(inputColumns[x]))
                {
                    object columnData = buffer[inputColumns[x]];
                    // TODO: Modify the column data.
                    buffer[inputColumns[x]] = columnData;
                }
            }
        
      }
}
Public Overrides Sub ProcessInput(ByVal InputID As Integer, ByVal buffer As PipelineBuffer)

        While (buffer.NextRow())

            For x As Integer = 0 To inputColumns.Length

                if buffer.IsNull(inputColumns(x)) = false then

                    Dim columnData As Object = buffer(inputColumns(x))
                    ' TODO: Modify the column data.
                    buffer(inputColumns(x)) = columnData

                End If
            Next

        End While
End Sub

Образец

В следующем образце демонстрируется простой компонент преобразования с синхронными выходами, изменяющий регистр значений всех символьных столбцов на верхний. В этом образце показаны не все методы и возможности, описанные в данном разделе. В этом образце демонстрируются важные методы, которые должны быть переопределены каждым пользовательским компонентом преобразования с синхронными выходами, но отсутствует код для проверки во время разработки. Полный образец компонента преобразования с синхронными выходами см. в образце компонента Change Case, который является одним из образцов для служб Integration Services в Codeplex.

using System;
using System.Collections;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;

namespace Uppercase
{
  [DtsPipelineComponent(DisplayName = "Uppercase")]
  public class Uppercase : PipelineComponent
  {
    ArrayList m_ColumnIndexList = new ArrayList();

    public override void ProvideComponentProperties()
    {
      base.ProvideComponentProperties();
      ComponentMetaData.InputCollection[0].Name = "Uppercase Input";
      ComponentMetaData.OutputCollection[0].Name = "Uppercase Output";
    }

    public override void PreExecute()
    {
      IDTSInput100 input = ComponentMetaData.InputCollection[0];
      IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;

      foreach (IDTSInputColumn100 column in inputColumns)
      {
        if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)
        {
          m_ColumnIndexList.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));
        }
      }
    }

    public override void ProcessInput(int inputID, PipelineBuffer buffer)
    {
      while (buffer.NextRow())
      {
        foreach (int columnIndex in m_ColumnIndexList)
        {
          string str = buffer.GetString(columnIndex);
          buffer.SetString(columnIndex, str.ToUpper());
        }
      }
    }
  }
}
Imports System 
Imports System.Collections 
Imports Microsoft.SqlServer.Dts.Pipeline 
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper 
Imports Microsoft.SqlServer.Dts.Runtime.Wrapper 
Namespace Uppercase 

 <DtsPipelineComponent(DisplayName="Uppercase")> _ 
 Public Class Uppercase 
 Inherits PipelineComponent 
   Private m_ColumnIndexList As ArrayList = New ArrayList 

   Public  Overrides Sub ProvideComponentProperties() 
     MyBase.ProvideComponentProperties 
     ComponentMetaData.InputCollection(0).Name = "Uppercase Input" 
     ComponentMetaData.OutputCollection(0).Name = "Uppercase Output" 
   End Sub 

   Public  Overrides Sub PreExecute() 
     Dim input As IDTSInput100 = ComponentMetaData.InputCollection(0) 
     Dim inputColumns As IDTSInputColumnCollection100 = input.InputColumnCollection 
     For Each column As IDTSInputColumn100 In inputColumns 
       If column.DataType = DataType.DT_STR OrElse column.DataType = DataType.DT_WSTR Then 
         m_ColumnIndexList.Add(CType(BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID), Integer)) 
       End If 
     Next 
   End Sub 

   Public  Overrides Sub ProcessInput(ByVal inputID As Integer, ByVal buffer As PipelineBuffer) 
     While buffer.NextRow 
       For Each columnIndex As Integer In m_ColumnIndexList 
         Dim str As String = buffer.GetString(columnIndex) 
         buffer.SetString(columnIndex, str.ToUpper) 
       Next 
     End While 
   End Sub 
 End Class 
End Namespace
Значок служб Integration Services (маленький) Будьте в курсе новых возможностей cлужб Integration Services

Чтобы загружать новейшую документацию, статьи, образцы и видеоматериалы от корпорации Майкрософт, а также лучшие решения от участников сообщества, посетите страницу служб Integration Services на узлах MSDN или TechNet:

Чтобы получать автоматические уведомления об этих обновлениях, подпишитесь на RSS-каналы, предлагаемые на этой странице.