Partilhar via


Copiar e transformar dados no Snowflake usando o Azure Data Factory ou o Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Gorjeta

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!

Este artigo descreve como usar a atividade Copiar no Azure Data Factory e nos pipelines do Azure Synapse para copiar dados de e para o Snowflake e usar o Fluxo de Dados para transformar dados no Snowflake. Para obter mais informações, consulte o artigo introdutório do Data Factory ou do Azure Synapse Analytics.

Importante

O novo conector Snowflake fornece suporte nativo melhorado a Snowflake. Se você estiver usando o conector Snowflake herdado em sua solução, é recomendável atualizar seu conector Snowflake o mais rápido possível. Consulte esta seção para obter detalhes sobre a diferença entre a versão herdada e a versão mais recente.

Capacidades suportadas

Este conector Snowflake é suportado para os seguintes recursos:

Capacidades suportadas IR
Atividade de cópia (origem/coletor) (1) (2)
Mapeando o fluxo de dados (origem/coletor) (1)
Atividade de Pesquisa (1) (2)
Atividade de script (1) (2)

(1) Tempo de execução de integração do Azure (2) Tempo de execução de integração auto-hospedado

Para a atividade Copiar, este conector Snowflake suporta as seguintes funções:

  • Copie dados do Snowflake que utilizam o comando COPY do Snowflake para [location] para obter o melhor desempenho.
  • Copie dados para o Snowflake que aproveita o comando COPY do Snowflake para [table] para obter o melhor desempenho. Ele suporta Snowflake no Azure.
  • Se um proxy for necessário para se conectar ao Snowflake a partir de um Integration Runtime auto-hospedado, você deverá configurar as variáveis de ambiente para HTTP_PROXY e HTTPS_PROXY no host do Integration Runtime.

Pré-requisitos

Se seu armazenamento de dados estiver localizado dentro de uma rede local, uma rede virtual do Azure ou a Amazon Virtual Private Cloud, você precisará configurar um tempo de execução de integração auto-hospedado para se conectar a ele. Certifique-se de adicionar os endereços IP que o tempo de execução de integração auto-hospedado usa à lista permitida.

Se o seu armazenamento de dados for um serviço de dados de nuvem gerenciado, você poderá usar o Tempo de Execução de Integração do Azure. Se o acesso for restrito a IPs aprovados nas regras de firewall, você poderá adicionar IPs do Tempo de Execução de Integração do Azure à lista de permissões.

A conta Snowflake usada para Source ou Sink deve ter o acesso necessário USAGE no banco de dados e acesso de leitura/gravação no esquema e nas tabelas/exibições sob ele. Além disso, ele também deve ter CREATE STAGE no esquema para ser capaz de criar o estágio externo com SAS URI.

Os seguintes valores de propriedades de conta devem ser definidos

Property Descrição Necessário Predefinição
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION Especifica se um objeto de integração de armazenamento deve ser exigido como credenciais de nuvem ao criar um estágio externo nomeado (usando CREATE STAGE) para acessar um local de armazenamento em nuvem privada. FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION Especifica se deve ser necessário usar um estágio externo nomeado que faça referência a um objeto de integração de armazenamento como credenciais de nuvem ao carregar ou descarregar dados de um local de armazenamento em nuvem privada. FALSE FALSE

Para obter mais informações sobre os mecanismos de segurança de rede e as opções suportadas pelo Data Factory, consulte Estratégias de acesso a dados.

Começar agora

Para executar a atividade Copiar com um pipeline, você pode usar uma das seguintes ferramentas ou SDKs:

Criar um serviço vinculado ao Snowflake usando a interface do usuário

Use as etapas a seguir para criar um serviço vinculado ao Snowflake na interface do usuário do portal do Azure.

  1. Navegue até a guia Gerenciar em seu espaço de trabalho do Azure Data Factory ou Synapse e selecione Serviços Vinculados e clique em Novo:

  2. Procure por Snowflake e selecione o conector Snowflake.

    Screenshot do conector Snowflake.

  3. Configure os detalhes do serviço, teste a conexão e crie o novo serviço vinculado.

    Captura de tela da configuração do serviço vinculado para Snowflake.

Detalhes de configuração do conector

As seções a seguir fornecem detalhes sobre propriedades que definem entidades específicas para um conector Snowflake.

Propriedades do serviço vinculado

Estas propriedades genéricas são suportadas para o serviço vinculado Snowflake:

Property Descrição Obrigatório
tipo A propriedade type deve ser definida como SnowflakeV2. Sim
accountIdentifier O nome da conta juntamente com a sua organização. Por exemplo, myorg-account123. Sim
base de dados O banco de dados padrão usado para a sessão após a conexão. Sim
armazém O armazém virtual padrão usado para a sessão após a conexão. Sim
authenticationType Tipo de autenticação usado para se conectar ao serviço Snowflake. Os valores permitidos são: Basic (Default) e KeyPair. Consulte as seções correspondentes abaixo sobre mais propriedades e exemplos, respectivamente. Não
função A função de segurança padrão usada para a sessão após a conexão. Não
host O nome do host da conta Snowflake. Por exemplo: contoso.snowflakecomputing.com. .cn também é suportado. Não
ConecteVia O tempo de execução de integração que é usado para se conectar ao armazenamento de dados. Você pode usar o tempo de execução de integração do Azure ou um tempo de execução de integração auto-hospedado (se seu armazenamento de dados estiver localizado em uma rede privada). Se não for especificado, ele usará o tempo de execução de integração padrão do Azure. Não

Este conector Snowflake suporta os seguintes tipos de autenticação. Consulte as seções correspondentes para obter detalhes.

Autenticação básica

Para usar a autenticação básica , além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:

Property Descrição Obrigatório
Utilizador Nome de login para o usuário do Snowflake. Sim
password A senha para o usuário do Snowflake. Marque este campo como um tipo SecureString para armazená-lo com segurança. Você também pode fazer referência a um segredo armazenado no Cofre da Chave do Azure. Sim

Exemplo:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Palavra-passe no Cofre de Chaves do Azure:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Nota

O mapeamento de fluxos de dados suporta apenas a autenticação básica.

Autenticação de par de chaves

Para usar a autenticação de par de chaves, você precisa configurar e criar um usuário de autenticação de par de chaves no Snowflake consultando Autenticação de par de chaves & Rotação de par de chaves. Depois, anote a chave privada e a senha (opcional), que você usa para definir o serviço vinculado.

Além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:

Property Descrição Obrigatório
Utilizador Nome de login para o usuário do Snowflake. Sim
chave privada A chave privada usada para a autenticação do par de chaves.

Para garantir que a chave privada seja válida quando enviada para o Azure Data Factory, e considerando que o arquivo privateKey inclui caracteres de nova linha (\n), é essencial formatar corretamente o conteúdo privateKey em sua forma literal de cadeia de caracteres. Esse processo envolve a adição de \n explicitamente a cada nova linha.
Sim
privateKeyPassphrase A frase secreta usada para desencriptar a chave privada, se estiver encriptada. Não

Exemplo:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Propriedades do conjunto de dados

Para obter uma lista completa de seções e propriedades disponíveis para definir conjuntos de dados, consulte o artigo Conjuntos de dados.

As propriedades a seguir são suportadas para o conjunto de dados Snowflake.

Property Descrição Obrigatório
tipo A propriedade type do conjunto de dados deve ser definida como SnowflakeV2Table. Sim
esquema Nome do esquema. Observe que o nome do esquema diferencia maiúsculas de minúsculas. Não para a fonte, sim para a pia
tabela Nome da tabela/vista. Observe que o nome da tabela diferencia maiúsculas de minúsculas. Não para a fonte, sim para a pia

Exemplo:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

Propriedades da atividade Copy

Para obter uma lista completa de seções e propriedades disponíveis para definir atividades, consulte o artigo Pipelines . Esta seção fornece uma lista de propriedades suportadas pela fonte e pelo coletor de flocos de neve.

Floco de neve como fonte

O conector Snowflake utiliza o comando COPY into [location] do Snowflake para obter o melhor desempenho.

Se o armazenamento de dados e o formato do coletor forem suportados nativamente pelo comando Snowflake COPY, você poderá usar a atividade Copiar para copiar diretamente do Snowflake para o sink. Para obter detalhes, consulte Cópia direta do Snowflake. Caso contrário, use a cópia Staged interna do Snowflake.

Para copiar dados do Snowflake, as seguintes propriedades são suportadas na seção Copiar fonte de atividade.

Property Descrição Obrigatório
tipo A propriedade type da fonte de atividade Copy deve ser definida como SnowflakeV2Source. Sim
query Especifica a consulta SQL para ler dados do Snowflake. Se os nomes do esquema, da tabela e das colunas contiverem minúsculas, cite o identificador do objeto na consulta, por exemplo. select * from "schema"."myTable"
Não há suporte para a execução de procedimento armazenado.
Não
exportSettings Configurações avançadas usadas para recuperar dados do Snowflake. Você pode configurar os suportados pelo comando COPY into que o serviço passará quando você invocar a instrução. Sim
Em exportSettings:
tipo O tipo de comando export, definido como SnowflakeExportCopyCommand. Sim
storageIntegração Especifique o nome da integração de armazenamento que você criou no Snowflake. Para obter as etapas de pré-requisito de uso da integração de armazenamento, consulte Configurando uma integração de armazenamento Snowflake. Não
additionalCopyOptions Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: MAX_FILE_SIZE, OVERWRITE. Para obter mais informações, consulte Opções de cópia do floco de neve. Não
additionalFormatOptions Opções de formato de arquivo adicionais que são fornecidas ao comando COPY como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT TIMESTAMP_FORMAT. Para obter mais informações, consulte Opções de tipo de formato Snowflake. Não

Nota

Verifique se você tem permissão para executar o seguinte comando e acessar o esquema INFORMATION_SCHEMA e as colunas da tabela.

  • COPY INTO <location>

Cópia direta de Snowflake

Se o armazenamento de dados e o formato do coletor atenderem aos critérios descritos nesta seção, você poderá usar a atividade Copiar para copiar diretamente do Snowflake para o coletor. O serviço verifica as configurações e falha na execução da atividade Copiar se os seguintes critérios não forem atendidos:

  • Quando você especifica storageIntegration na fonte:

    O repositório de dados do coletor é o Armazenamento de Blobs do Azure que você mencionou no estágio externo no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:

    1. Crie um serviço vinculado de Armazenamento de Blob do Azure para o coletor Armazenamento de Blobs do Azure com qualquer tipo de autenticação suportado.

    2. Conceda pelo menos a função de Colaborador de Dados de Blob de Armazenamento à entidade de serviço do Snowflake no coletor do Azure Blob Storage Access Control (IAM).

  • Quando você não especifica storageIntegration na fonte:

    O serviço vinculado ao coletor é o armazenamento de Blob do Azure com autenticação de assinatura de acesso compartilhado. Se quiser copiar dados diretamente para o Azure Data Lake Storage Gen2 no seguinte formato com suporte, você pode criar um serviço vinculado do Armazenamento de Blobs do Azure com autenticação SAS em sua conta do Azure Data Lake Storage Gen2, para evitar o uso de cópia em estágios do Snowflake.

  • O formato de dados do coletor é Parquet, texto delimitado ou JSON com as seguintes configurações:

    • Para o formato Parquet , o codec de compressão é None, Snappy ou Lzo.
    • Para o formato de texto delimitado:
      • rowDelimiter é \r\n, ou qualquer caractere único.
      • compression pode ser sem compressão, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • quoteChar é aspa dupla, aspa simples ou string vazia (sem aspas char).
    • Para o formato JSON, a cópia direta suporta apenas o caso de a tabela Snowflake de origem ou o resultado da consulta ter apenas uma coluna e o tipo de dados dessa coluna ser VARIANT, OBJECT ou ARRAY.
      • compression pode ser sem compressão, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • filePattern no coletor de atividade de cópia é deixado como padrão ou definido como setOfObjects.
  • Na fonte da atividade de cópia, additionalColumns não é especificado.

  • O mapeamento de colunas não é especificado.

Exemplo:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "query": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Cópia encenada de Snowflake

Quando o formato ou armazenamento de dados do coletor não for compatível nativamente com o comando COPY do Snowflake, conforme mencionado na última seção, habilite a cópia em estágios interna usando uma instância provisória de armazenamento de Blob do Azure. O recurso de cópia em estágios também oferece uma melhor taxa de transferência. O serviço exporta dados do Snowflake para o armazenamento de preparo, copia os dados para o coletor e, finalmente, limpa os dados temporários do armazenamento de preparação. Consulte Cópia em etapas para obter detalhes sobre como copiar dados usando preparo.

Para usar esse recurso, crie um serviço vinculado de armazenamento de Blob do Azure que se refira à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as enableStaging propriedades e stagingSettings na atividade Copiar.

  • Quando você especifica storageIntegration na origem, o armazenamento de Blob do Azure de preparo provisório deve ser aquele que você mencionou no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado de Armazenamento de Blobs do Azure para ele com qualquer autenticação com suporte e conceda pelo menos a função de Colaborador de Dados de Blob de Armazenamento à entidade de serviço Snowflake no Controle de Acesso de Armazenamento de Blobs (IAM) do Azure de preparação.

  • Quando você não especifica storageIntegration na origem, o serviço vinculado de Armazenamento de Blob do Azure de preparo deve usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY. Certifique-se de conceder permissão de acesso adequada ao Snowflake no Armazenamento de Blobs do Azure de preparação. Para saber mais sobre isso, consulte este artigo.

Exemplo:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "query": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"                    
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Ao executar uma cópia em estágios do Snowflake, é crucial definir o comportamento de cópia do coletor como mesclar arquivos. Essa configuração garante que todos os arquivos particionados sejam manipulados e mesclados corretamente, evitando o problema em que apenas o último arquivo particionado é copiado.

Exemplo de configuração

{
    "type": "Copy",
    "source": {
        "type": "SnowflakeSource",
        "query": "SELECT * FROM my_table"
    },
    "sink": {
        "type": "AzureBlobStorage",
        "copyBehavior": "MergeFiles"
    }
}

Nota

A falha ao definir o comportamento de cópia do coletor para mesclar arquivos pode resultar em apenas o último arquivo particionado sendo copiado.

Floco de neve como pia

O conector Snowflake utiliza o comando COPY into [table] do Snowflake para obter o melhor desempenho. Ele dá suporte à gravação de dados no Snowflake no Azure.

Se o armazenamento de dados de origem e o formato forem suportados nativamente pelo comando Snowflake COPY, você poderá usar a atividade Copiar para copiar diretamente da origem para o Snowflake. Para obter detalhes, consulte Cópia direta para Snowflake. Caso contrário, use a cópia em etapas interna para o Snowflake.

Para copiar dados para o Snowflake, as seguintes propriedades são suportadas na seção Copiar coletor de atividade.

Property Descrição Obrigatório
tipo A propriedade type do coletor de atividade Copy, definida como SnowflakeV2Sink. Sim
pré-CopyScript Especifique uma consulta SQL para que a atividade Copy seja executada antes de gravar dados no Snowflake em cada execução. Use essa propriedade para limpar os dados pré-carregados. Não
importSettings Configurações avançadas usadas para gravar dados no Snowflake. Você pode configurar os suportados pelo comando COPY into que o serviço passará quando você invocar a instrução. Sim
Em importSettings:
tipo O tipo de comando import, definido como SnowflakeImportCopyCommand. Sim
storageIntegração Especifique o nome da integração de armazenamento que você criou no Snowflake. Para obter as etapas de pré-requisito de uso da integração de armazenamento, consulte Configurando uma integração de armazenamento Snowflake. Não
additionalCopyOptions Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Para obter mais informações, consulte Opções de cópia do floco de neve. Não
additionalFormatOptions Opções adicionais de formato de arquivo fornecidas ao comando COPY, fornecidas como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT TIMESTAMP_FORMAT. Para obter mais informações, consulte Opções de tipo de formato Snowflake. Não

Nota

Verifique se você tem permissão para executar o seguinte comando e acessar o esquema INFORMATION_SCHEMA e as colunas da tabela.

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Cópia direta para Snowflake

Se o armazenamento e o formato de dados de origem atenderem aos critérios descritos nesta seção, você poderá usar a atividade Copiar para copiar diretamente da fonte para o Snowflake. O serviço verifica as configurações e falha na execução da atividade Copiar se os seguintes critérios não forem atendidos:

  • Quando você especifica storageIntegration no coletor:

    O armazenamento de dados de origem é o Armazenamento de Blob do Azure que você mencionou no estágio externo no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:

    1. Crie um serviço vinculado do Armazenamento de Blobs do Azure para o Armazenamento de Blobs do Azure de origem com qualquer tipo de autenticação suportado.

    2. Conceda pelo menos a função de Leitor de Dados de Blob de Armazenamento à entidade de serviço Snowflake no IAM (Controle de Acesso de Armazenamento de Blob) do Azure de origem.

  • Quando você não especifica storageIntegration na pia:

    O serviço vinculado de origem é o armazenamento de Blob do Azure com autenticação de assinatura de acesso compartilhado. Se quiser copiar dados diretamente do Azure Data Lake Storage Gen2 no seguinte formato com suporte, você pode criar um serviço vinculado do Armazenamento de Blobs do Azure com autenticação SAS em sua conta do Azure Data Lake Storage Gen2, para evitar o uso de cópia em estágios para o Snowflake.

  • O formato de dados de origem é Parquet, texto delimitado ou JSON com as seguintes configurações:

    • Para o formato Parquet , o codec de compressão é None, ou Snappy.

    • Para o formato de texto delimitado:

      • rowDelimiter é \r\n, ou qualquer caractere único. Se o delimitador de linha não for "\r\n", firstRowAsHeader precisará ser false e skipLineCount não será especificado.
      • compression pode ser sem compressão, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".
      • quoteChar é aspa dupla, aspa simples ou string vazia (sem aspas char).
    • Para o formato JSON, a cópia direta suporta apenas o caso em que a tabela Snowflake do coletor tem apenas uma coluna e o tipo de dados dessa coluna é VARIANT, OBJECT ou ARRAY.

      • compression pode ser sem compressão, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • O mapeamento de colunas não é especificado.
  • Na fonte da atividade Copiar:

    • additionalColumns não é especificado.
    • Se a origem for uma pasta, recursive será definida como true.
    • prefix, modifiedDateTimeStart, modifiedDateTimeEnd, e enablePartitionDiscovery não são especificados.

Exemplo:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            }
        }
    }
]

Cópia encenada para Snowflake

Quando o armazenamento ou formato de dados de origem não for nativamente compatível com o comando Snowflake COPY, conforme mencionado na última seção, habilite a cópia em estágios interna usando uma instância provisória de armazenamento de Blob do Azure. O recurso de cópia em estágios também oferece uma melhor taxa de transferência. O serviço converte automaticamente os dados para atender aos requisitos de formato de dados do Snowflake. Em seguida, ele invoca o comando COPY para carregar dados no Snowflake. Finalmente, ele limpa seus dados temporários do armazenamento de blob. Consulte Cópia em etapas para obter detalhes sobre como copiar dados usando preparo.

Para usar esse recurso, crie um serviço vinculado de armazenamento de Blob do Azure que se refira à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as enableStaging propriedades e stagingSettings na atividade Copiar.

  • Quando você especifica storageIntegration no coletor, o armazenamento de Blob do Azure de preparo provisório deve ser aquele que você mencionou no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado do Armazenamento de Blobs do Azure para ele com qualquer autenticação com suporte e conceda pelo menos a função de Leitor de Dados de Blob de Armazenamento à entidade de serviço Snowflake no Controle de Acesso de Armazenamento de Blob (IAM) do Azure em preparação.

  • Quando você não especifica storageIntegration no coletor, o serviço vinculado de Armazenamento de Blob do Azure de preparo precisa usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY.

Exemplo:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Mapeando propriedades de fluxo de dados

Ao transformar dados em mapeamento de fluxo de dados, você pode ler e gravar em tabelas no Snowflake. Para obter mais informações, consulte a transformação de origem e a transformação de coletor no mapeamento de fluxos de dados. Você pode optar por usar um conjunto de dados Snowflake ou um conjunto de dados embutido como fonte e tipo de coletor.

Transformação da fonte

A tabela abaixo lista as propriedades suportadas pela fonte Snowflake. Você pode editar essas propriedades na guia Opções de origem. O conector utiliza transferência de dados interna Snowflake.

Nome Descrição Obrigatório Valores permitidos Propriedade do script de fluxo de dados
Tabela Se você selecionar Tabela como entrada, o fluxo de dados buscará todos os dados da tabela especificada no conjunto de dados Snowflake ou nas opções de origem ao usar o conjunto de dados embutido. Não String (apenas para conjunto de dados embutido)
tableName
schemaName
Query Se você selecionar Consulta como entrada, insira uma consulta para buscar dados do Snowflake. Essa configuração substitui qualquer tabela escolhida no conjunto de dados.
Se os nomes do esquema, da tabela e das colunas contiverem minúsculas, cite o identificador do objeto na consulta, por exemplo. select * from "schema"."myTable"
Não String query
Ativar extração incremental (Pré-visualização) Use essa opção para informar ao ADF para processar somente linhas que foram alteradas desde a última vez que o pipeline foi executado. Não Boolean habilitarCdc
Coluna incremental Ao usar o recurso de extração incremental, você deve escolher a data/hora/coluna numérica que deseja usar como marca d'água na tabela de origem. Não String waterMarkColumn
Ativar o rastreamento de alterações do floco de neve (visualização) Essa opção permite que o ADF aproveite a tecnologia de captura de dados de alteração do Snowflake para processar apenas os dados delta desde a execução anterior do pipeline. Esta opção carrega automaticamente os dados delta com operações de inserção de linha, atualização e exclusão sem exigir nenhuma coluna incremental. Não Boolean enableNativeCdc
Alterações líquidas Ao usar o rastreamento de alterações de flocos de neve, você pode usar essa opção para obter linhas alteradas ou alterações exaustivas. As linhas alteradas com deduped mostrarão apenas as versões mais recentes das linhas que foram alteradas desde um determinado ponto no tempo, enquanto as alterações exaustivas mostrarão todas as versões de cada linha que foi alterada, incluindo as que foram excluídas ou atualizadas. Por exemplo, se você atualizar uma linha, verá uma versão de exclusão e uma versão de inserção em alterações exaustivas, mas apenas a versão de inserção em linhas alteradas dedupedadas. Dependendo do seu caso de uso, você pode escolher a opção que atenda às suas necessidades. A opção padrão é false, o que significa alterações exaustivas. Não Boolean netAlterações
Incluir colunas do sistema Ao usar o rastreamento de alterações de flocos de neve, você pode usar a opção systemColumns para controlar se as colunas de fluxo de metadados fornecidas pelo Snowflake são incluídas ou excluídas na saída de controle de alterações. Por padrão, systemColumns é definido como true, o que significa que as colunas de fluxo de metadados estão incluídas. Você pode definir systemColumns como false se quiser excluí-los. Não Boolean systemColumns
Comece a ler desde o início Definir essa opção com extração incremental e controle de alterações instruirá o ADF a ler todas as linhas na primeira execução de um pipeline com a extração incremental ativada. Não Boolean skipInitialLoad

Exemplos de script de origem do Snowflake

Quando você usa o conjunto de dados Snowflake como tipo de origem, o script de fluxo de dados associado é:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

Se você usar o conjunto de dados embutido, o script de fluxo de dados associado será:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

Controlo de alterações nativo

O Azure Data Factory agora dá suporte a um recurso nativo no Snowflake conhecido como controle de alterações, que envolve o controle de alterações na forma de logs. Esse recurso do floco de neve nos permite acompanhar as mudanças nos dados ao longo do tempo, tornando-os úteis para o carregamento incremental de dados e para fins de auditoria. Para utilizar esse recurso, quando você habilita a captura de dados de alteração e seleciona o Rastreamento de alterações de floco de neve, criamos um objeto Stream para a tabela de origem que habilita o controle de alterações na tabela de flocos de neve de origem. Posteriormente, usamos a cláusula CHANGES em nossa consulta para buscar apenas os dados novos ou atualizados da tabela de origem. Além disso, é recomendável agendar o pipeline de modo que as alterações sejam consumidas dentro do intervalo de tempo de retenção de dados definido para a tabela de origem do floco de neve, caso contrário, o usuário poderá ver um comportamento inconsistente nas alterações capturadas.

Transformação do lavatório

A tabela abaixo lista as propriedades suportadas pelo Snowflake sink. Você pode editar essas propriedades na guia Configurações. Ao usar o conjunto de dados embutido, você verá configurações adicionais, que são as mesmas que as propriedades descritas na seção de propriedades do conjunto de dados. O conector utiliza transferência de dados interna Snowflake.

Nome Descrição Obrigatório Valores permitidos Propriedade do script de fluxo de dados
Método de atualização Especifique quais operações são permitidas em seu destino Snowflake.
Para atualizar, atualizar ou excluir linhas, uma transformação de linha Alter é necessária para marcar linhas para essas ações.
Sim true ou false suprimido
inserível
atualizável
Atualizável
Colunas-chave Para atualizações, upserts e exclusões, uma coluna ou colunas de chave devem ser definidas para determinar qual linha alterar. Não Matriz chaves
Ação da tabela Determina se todas as linhas da tabela de destino devem ser recriadas ou removidas antes da gravação.
- Nenhuma: Nenhuma ação será feita para a mesa.
- Recriar: A tabela será descartada e recriada. Necessário se criar uma nova tabela dinamicamente.
- Truncate: Todas as linhas da tabela de destino serão removidas.
Não true ou false recriar
truncate

Exemplos de script de sumidouro de flocos de neve

Quando você usa o conjunto de dados Snowflake como tipo de coletor, o script de fluxo de dados associado é:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Se você usar o conjunto de dados embutido, o script de fluxo de dados associado será:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Otimização do Query Pushdown

Ao definir o Nível de Log do pipeline como Nenhum, excluímos a transmissão de métricas de transformação intermediárias, evitando possíveis obstáculos às otimizações do Spark e permitindo a otimização de pushdown de consulta fornecida pelo Snowflake. Essa otimização de pushdown permite melhorias substanciais de desempenho para grandes tabelas Snowflake com conjuntos de dados extensos.

Nota

Não suportamos tabelas temporárias no Snowflake, pois elas são locais para a sessão ou o usuário que as cria, tornando-as inacessíveis a outras sessões e propensas a serem substituídas como tabelas regulares pelo Snowflake. Embora o Snowflake ofereça tabelas transitórias como alternativa, que são acessíveis globalmente, elas exigem exclusão manual, contradizendo nosso objetivo principal de usar tabelas Temp, que é evitar quaisquer operações de exclusão no esquema de origem.

Propriedades da atividade de pesquisa

Para obter mais informações sobre as propriedades, consulte Atividade de pesquisa.

Atualize o conector Snowflake

Para atualizar o conector Snowflake, você pode fazer uma atualização lado a lado ou uma atualização in-loco.

Atualização lado a lado

Para executar uma atualização lado a lado, conclua as seguintes etapas:

  1. Crie um novo serviço vinculado do Snowflake e configure-o fazendo referência às propriedades do serviço vinculado.
  2. Crie um conjunto de dados com base no serviço vinculado Snowflake recém-criado.
  3. Substitua o novo serviço vinculado e o conjunto de dados pelos existentes nos pipelines destinados aos objetos herdados.

Atualização no local

Para executar uma atualização in-loco, você precisa editar a carga útil do serviço vinculado existente e atualizar o conjunto de dados para usar o novo serviço vinculado.

  1. Atualize o tipo de Snowflake para SnowflakeV2.

  2. Modifique a carga útil do serviço vinculado de seu formato herdado para o novo padrão. Você pode preencher cada campo da interface do usuário depois de alterar o tipo mencionado acima ou atualizar a carga diretamente através do Editor JSON. Consulte a seção Propriedades do serviço vinculado neste artigo para obter as propriedades de conexão suportadas. Os exemplos a seguir mostram as diferenças na carga útil para os serviços vinculados Snowflake herdados e novos:

    Carga JSON do serviço vinculado Snowflake herdado:

      {
         "name": "Snowflake1",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "annotations": [],
             "type": "Snowflake",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC",
                 "encryptedCredential": "<your_encrypted_credential_value>"
             },
             "connectVia": {
                 "referenceName": "AzureIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    

    Nova carga útil JSON do serviço vinculado Snowflake:

     {
         "name": "Snowflake2",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "parameters": {
                 "schema": {
                     "type": "string",
                     "defaultValue": "PUBLIC"
                 }
             },
             "annotations": [],
             "type": "SnowflakeV2",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "accountIdentifier": "<FAKE_Account>",
                 "user": "FAKE_USER",
                 "database": "FAKE_DB",
                 "warehouse": "FAKE_DW",
                 "encryptedCredential": "<placeholder>"
             },
             "connectVia": {
                 "referenceName": "AutoResolveIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    
  3. Atualize o conjunto de dados para usar o novo serviço vinculado. Você pode criar um novo conjunto de dados com base no serviço vinculado recém-criado ou atualizar a propriedade type de um conjunto de dados existente de SnowflakeTable para SnowflakeV2Table.

Diferenças entre Floco de Neve e Floco de Neve (legado)

O conector Snowflake oferece novas funcionalidades e é compatível com a maioria dos recursos do conector Snowflake (legado). A tabela abaixo mostra as diferenças de recursos entre Snowflake e Snowflake (legado).

Snowflake Floco de neve (legado)
Suporta autenticação de par Basic e Key. Suporte a autenticação básica.
Atualmente, não há suporte para parâmetros de script na atividade de script. Como alternativa, utilize expressões dinâmicas para parâmetros de script. Para obter mais informações, consulte Expressões e funções no Azure Data Factory e Azure Synapse Analytics. Suporte a parâmetros de script na atividade de script.
Suporte BigDecimal na atividade de pesquisa. O tipo NÚMERO, conforme definido em Floco de neve, será exibido como uma cadeia de caracteres na atividade Pesquisa. Se você quiser encobri-lo para o tipo numérico, você pode usar o parâmetro pipeline com função int ou função float. Por exemplo, int(activity('lookup').output.firstRow.VALUE), float(activity('lookup').output.firstRow.VALUE) BigDecimal não é suportado na atividade de pesquisa.
As accountIdentifierpropriedades , warehouse, schema database, e são role usadas para estabelecer uma conexão. A connectionstring propriedade é usada para estabelecer uma conexão.
O tipo de dados de carimbo de data/hora no Snowflake é lido como o tipo de dados DateTimeOffset na atividade Pesquisa e Script. O tipo de dados de carimbo de data/hora no Snowflake é lido como o tipo de dados DateTime na atividade Pesquisa e Script.
Se você ainda precisar usar o valor Datetime como um parâmetro em seu pipeline depois de atualizar o conector, poderá converter o tipo DateTimeOffset para o tipo DateTime usando a função formatDateTime (recomendada) ou a função concat. Por exemplo: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE), concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z')

Para obter uma lista de armazenamentos de dados suportados como fontes e coletores por atividade de cópia, consulte Armazenamentos de dados e formatos suportados.