Atividade ForEach no Azure Data Factory e no Azure Synapse Analytics
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
A Atividade ForEach define um fluxo de controle repetitivo em um pipeline do Azure Data Factory ou Sinapse. Esta atividade é utilizada para iterar uma coleção e executa atividades especificadas em ciclo. A implementação de ciclo desta atividade é semelhante à estrutura de ciclo Foreach nas linguagens de programação.
Criar uma atividade ForEach com a interface do usuário
Para usar uma atividade ForEach em um pipeline, conclua as seguintes etapas:
Você pode usar qualquer variável de tipo de matriz ou saídas de outras atividades como entrada para sua atividade ForEach. Para criar uma variável de matriz, selecione o plano de fundo da tela de pipeline e, em seguida, selecione a guia Variáveis para adicionar uma variável de tipo de matriz, conforme mostrado abaixo.
Procure ForEach no painel Atividades do pipeline e arraste uma atividade ForEach para a tela do pipeline.
Selecione a nova atividade ForEach na tela, se ainda não estiver selecionada, e sua guia Configurações , para editar seus detalhes.
Selecione o campo Itens e, em seguida, selecione o link Adicionar conteúdo dinâmico para abrir o painel do editor de conteúdo dinâmico.
Selecione sua matriz de entrada a ser filtrada no editor de conteúdo dinâmico. Neste exemplo, selecionamos a variável criada na primeira etapa.
Selecione o editor de Atividades na atividade ForEach para adicionar uma ou mais atividades a serem executadas para cada item na matriz Itens de entrada.
Em qualquer atividade que você criar dentro da atividade ForEach, você pode fazer referência ao item atual pelo qual a atividade ForEach está iterando na lista Itens . Você pode fazer referência ao item atual em qualquer lugar em que possa usar uma expressão dinâmica para especificar um valor de propriedade. No editor de conteúdo dinâmico, selecione o iterador ForEach para retornar o item atual.
Sintaxe
As propriedades são descritas mais adiante neste artigo. A propriedade items é a coleção e cada item na coleção é referido usando o @item()
conforme mostrado na sintaxe a seguir:
{
"name":"MyForEachActivityName",
"type":"ForEach",
"typeProperties":{
"isSequential":"true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPathCollection",
"type": "Expression"
},
"activities":[
{
"name":"MyCopyActivity",
"type":"Copy",
"typeProperties":{
...
},
"inputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs":[
{
"referenceName":"MyDataset",
"type":"DatasetReference",
"parameters":{
"MyFolderPath":"@item()"
}
}
]
}
]
}
}
Propriedades do tipo
Property | Description | Valores permitidos | Necessário |
---|---|---|---|
nome | Nome da atividade para cada atividade. | Cadeia (de carateres) | Sim |
tipo | Deve ser definido como ForEach | Cadeia (de carateres) | Sim |
isSequencial | Especifica se o loop deve ser executado sequencialmente ou em paralelo. Máximo de 50 iterações de loop podem ser executadas de uma só vez em paralelo). Por exemplo, se você tiver uma atividade ForEach iterando sobre uma atividade de cópia com 10 conjuntos de dados de origem e coletor diferentes com isSequential definido como False, todas as cópias serão executadas de uma só vez. O padrão é False. Se "isSequential" estiver definido como False, verifique se há uma configuração correta para executar vários executáveis. Caso contrário, essa propriedade deve ser usada com cuidado para evitar incorrer em conflitos de gravação. Para obter mais informações, consulte a seção Execução paralela. |
Boolean | N.º O padrão é False. |
batchCount | Contagem de lotes a ser usada para controlar o número de execuções paralelas (quando isSequential é definido como false). Este é o limite superior de simultaneidade, mas a atividade para cada uma nem sempre será executada nesse número | Inteiro (máximo 50) | N.º O padrão é 20. |
Items | Uma expressão que retorna uma matriz JSON a ser iterada. | Expressão (que retorna uma matriz JSON) | Sim |
Atividades | As atividades a serem executadas. | Lista de Atividades | Sim |
Execução paralela
Se isSequential estiver definido como false, a atividade itera em paralelo com um máximo de 50 iterações simultâneas. Esta definição deve ser utilizada com precaução. Se as iterações simultâneas estiverem gravando na mesma pasta, mas em arquivos diferentes, essa abordagem é boa. Se as iterações simultâneas estiverem gravando simultaneamente no mesmo arquivo, essa abordagem provavelmente causará um erro.
Linguagem de expressão de iteração
Na atividade ForEach, forneça uma matriz a ser iterada para os itens de propriedade." Use @item()
para iterar sobre uma única enumeração na atividade ForEach. Por exemplo, se items for uma matriz: [1, 2, 3], @item()
retorna 1 na primeira iteração, 2 na segunda iteração e 3 na terceira iteração. Você também pode usar @range(0,10)
a expressão like para iterar dez vezes começando com 0 terminando com 9.
Iteração em uma única atividade
Cenário: copie do mesmo arquivo de origem no Blob do Azure para vários arquivos de destino no Blob do Azure.
Definição de pipeline
{
"name": "<MyForEachPipeline>",
"properties": {
"activities": [
{
"name": "<MyForEachActivity>",
"type": "ForEach",
"typeProperties": {
"isSequential": "true",
"items": {
"value": "@pipeline().parameters.mySinkDatasetFolderPath",
"type": "Expression"
},
"activities": [
{
"name": "MyCopyActivity",
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": "false"
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
}
},
"inputs": [
{
"referenceName": "<MyDataset>",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@pipeline().parameters.mySourceDatasetFolderPath"
}
}
],
"outputs": [
{
"referenceName": "MyDataset",
"type": "DatasetReference",
"parameters": {
"MyFolderPath": "@item()"
}
}
]
}
]
}
}
],
"parameters": {
"mySourceDatasetFolderPath": {
"type": "String"
},
"mySinkDatasetFolderPath": {
"type": "String"
}
}
}
}
Definição do conjunto de dados de Blob
{
"name":"<MyDataset>",
"properties":{
"type":"AzureBlob",
"typeProperties":{
"folderPath":{
"value":"@dataset().MyFolderPath",
"type":"Expression"
}
},
"linkedServiceName":{
"referenceName":"StorageLinkedService",
"type":"LinkedServiceReference"
},
"parameters":{
"MyFolderPath":{
"type":"String"
}
}
}
}
Executar valores de parâmetro
{
"mySourceDatasetFolderPath": "input/",
"mySinkDatasetFolderPath": [ "outputs/file1", "outputs/file2" ]
}
Iterar em várias atividades
É possível iterar em várias atividades (por exemplo: copiar e atividades da Web) em uma atividade ForEach. Nesse cenário, recomendamos que você abstraia várias atividades em um pipeline separado. Em seguida, você pode usar a atividade ExecutePipeline no pipeline com a atividade ForEach para invocar o pipeline separado com várias atividades.
Sintaxe
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "<MyForEachMultipleActivities>"
"typeProperties": {
"isSequential": true,
"items": {
...
},
"activities": [
{
"type": "ExecutePipeline",
"name": "<MyInnerPipeline>"
"typeProperties": {
"pipeline": {
"referenceName": "<copyHttpPipeline>",
"type": "PipelineReference"
},
"parameters": {
...
},
"waitOnCompletion": true
}
}
]
}
}
],
"parameters": {
...
}
}
}
Exemplo
Cenário: itere sobre um InnerPipeline dentro de uma atividade ForEach com a atividade Execute Pipeline. O pipeline interno copia com definições de esquema parametrizadas.
Definição de Master Pipeline
{
"name": "masterPipeline",
"properties": {
"activities": [
{
"type": "ForEach",
"name": "MyForEachActivity",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.inputtables",
"type": "Expression"
},
"activities": [
{
"type": "ExecutePipeline",
"typeProperties": {
"pipeline": {
"referenceName": "InnerCopyPipeline",
"type": "PipelineReference"
},
"parameters": {
"sourceTableName": {
"value": "@item().SourceTable",
"type": "Expression"
},
"sourceTableStructure": {
"value": "@item().SourceTableStructure",
"type": "Expression"
},
"sinkTableName": {
"value": "@item().DestTable",
"type": "Expression"
},
"sinkTableStructure": {
"value": "@item().DestTableStructure",
"type": "Expression"
}
},
"waitOnCompletion": true
},
"name": "ExecuteCopyPipeline"
}
]
}
}
],
"parameters": {
"inputtables": {
"type": "Array"
}
}
}
}
Definição de pipeline interno
{
"name": "InnerCopyPipeline",
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "SqlSource",
}
},
"sink": {
"type": "SqlSink"
}
},
"name": "CopyActivity",
"inputs": [
{
"referenceName": "sqlSourceDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sourceTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sourceTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "sqlSinkDataset",
"parameters": {
"SqlTableName": {
"value": "@pipeline().parameters.sinkTableName",
"type": "Expression"
},
"SqlTableStructure": {
"value": "@pipeline().parameters.sinkTableStructure",
"type": "Expression"
}
},
"type": "DatasetReference"
}
]
}
],
"parameters": {
"sourceTableName": {
"type": "String"
},
"sourceTableStructure": {
"type": "String"
},
"sinkTableName": {
"type": "String"
},
"sinkTableStructure": {
"type": "String"
}
}
}
}
Definição do conjunto de dados de origem
{
"name": "sqlSourceDataset",
"properties": {
"type": "SqlServerTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "sqlserverLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Definição do conjunto de dados do coletor
{
"name": "sqlSinkDataSet",
"properties": {
"type": "AzureSqlTable",
"typeProperties": {
"tableName": {
"value": "@dataset().SqlTableName",
"type": "Expression"
}
},
"structure": {
"value": "@dataset().SqlTableStructure",
"type": "Expression"
},
"linkedServiceName": {
"referenceName": "azureSqlLS",
"type": "LinkedServiceReference"
},
"parameters": {
"SqlTableName": {
"type": "String"
},
"SqlTableStructure": {
"type": "String"
}
}
}
}
Parâmetros do pipeline mestre
{
"inputtables": [
{
"SourceTable": "department",
"SourceTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
],
"DestTable": "department2",
"DestTableStructure": [
{
"name": "departmentid",
"type": "int"
},
{
"name": "departmentname",
"type": "string"
}
]
}
]
}
Agregando saídas
Para agregar saídas de cada atividade, utilize Variáveis e Anexe atividade de variável .
Primeiro, declare uma array
variável no pipeline. Em seguida, invoque a atividade Append Variable dentro de cada loop foreach . Posteriormente, você pode recuperar a agregação de sua matriz.
Limitações e soluções alternativas
Aqui estão algumas limitações da atividade ForEach e soluções alternativas sugeridas.
Limitação | Solução |
---|---|
Não é possível aninhar um loop ForEach dentro de outro loop ForEach (ou um loop TUntil ). | Projete um pipeline de dois níveis em que o pipeline externo com o loop ForEach externo itere sobre um pipeline interno com o loop aninhado. |
A atividade ForEach tem um máximo batchCount de 50 para processamento paralelo e um máximo de 100.000 itens. |
Projete um pipeline de dois níveis em que o pipeline externo com a atividade ForEach itere sobre um pipeline interno. |
SetVariable não pode ser usado dentro de uma atividade ForEach que é executada em paralelo, pois as variáveis são globais para todo o pipeline, elas não têm escopo para um ForEach ou qualquer outra atividade. | Considere usar ForEach sequencial ou usar Execute Pipeline dentro de ForEach (Variable/Parameter handled in child Pipeline). |
Conteúdos relacionados
Veja outras atividades de fluxo de controle suportadas: