Copiar e transformar dados no Snowflake usando o Azure Data Factory ou o Azure Synapse Analytics
APLICA-SE A: Azure Data Factory Azure Synapse Analytics
Gorjeta
Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange tudo, desde a movimentação de dados até ciência de dados, análises em tempo real, business intelligence e relatórios. Saiba como iniciar uma nova avaliação gratuitamente!
Este artigo descreve como usar a atividade Copiar no Azure Data Factory e nos pipelines do Azure Synapse para copiar dados de e para o Snowflake e usar o Fluxo de Dados para transformar dados no Snowflake. Para obter mais informações, consulte o artigo introdutório do Data Factory ou do Azure Synapse Analytics.
Importante
O novo conector Snowflake fornece suporte nativo melhorado a Snowflake. Se você estiver usando o conector Snowflake herdado em sua solução, é recomendável atualizar seu conector Snowflake o mais rápido possível. Consulte esta seção para obter detalhes sobre a diferença entre a versão herdada e a versão mais recente.
Capacidades suportadas
Este conector Snowflake é suportado para os seguintes recursos:
Capacidades suportadas | IR |
---|---|
Atividade de cópia (origem/coletor) | (1) (2) |
Mapeando o fluxo de dados (origem/coletor) | (1) |
Atividade de Pesquisa | (1) (2) |
Atividade de script | (1) (2) |
(1) Tempo de execução de integração do Azure (2) Tempo de execução de integração auto-hospedado
Para a atividade Copiar, este conector Snowflake suporta as seguintes funções:
- Copie dados do Snowflake que utilizam o comando COPY do Snowflake para [location] para obter o melhor desempenho.
- Copie dados para o Snowflake que aproveita o comando COPY do Snowflake para [table] para obter o melhor desempenho. Ele suporta Snowflake no Azure.
- Se um proxy for necessário para se conectar ao Snowflake a partir de um Integration Runtime auto-hospedado, você deverá configurar as variáveis de ambiente para HTTP_PROXY e HTTPS_PROXY no host do Integration Runtime.
Pré-requisitos
Se seu armazenamento de dados estiver localizado dentro de uma rede local, uma rede virtual do Azure ou a Amazon Virtual Private Cloud, você precisará configurar um tempo de execução de integração auto-hospedado para se conectar a ele. Certifique-se de adicionar os endereços IP que o tempo de execução de integração auto-hospedado usa à lista permitida.
Se o seu armazenamento de dados for um serviço de dados de nuvem gerenciado, você poderá usar o Tempo de Execução de Integração do Azure. Se o acesso for restrito a IPs aprovados nas regras de firewall, você poderá adicionar IPs do Tempo de Execução de Integração do Azure à lista de permissões.
A conta Snowflake usada para Source ou Sink deve ter o acesso necessário USAGE
no banco de dados e acesso de leitura/gravação no esquema e nas tabelas/exibições sob ele. Além disso, ele também deve ter CREATE STAGE
no esquema para ser capaz de criar o estágio externo com SAS URI.
Os seguintes valores de propriedades de conta devem ser definidos
Property | Descrição | Necessário | Predefinição |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | Especifica se um objeto de integração de armazenamento deve ser exigido como credenciais de nuvem ao criar um estágio externo nomeado (usando CREATE STAGE) para acessar um local de armazenamento em nuvem privada. | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | Especifica se deve ser necessário usar um estágio externo nomeado que faça referência a um objeto de integração de armazenamento como credenciais de nuvem ao carregar ou descarregar dados de um local de armazenamento em nuvem privada. | FALSE | FALSE |
Para obter mais informações sobre os mecanismos de segurança de rede e as opções suportadas pelo Data Factory, consulte Estratégias de acesso a dados.
Começar agora
Para executar a atividade Copiar com um pipeline, você pode usar uma das seguintes ferramentas ou SDKs:
- A ferramenta Copiar dados
- O portal do Azure
- O SDK do .NET
- O SDK do Python
- Azure PowerShell
- A API REST
- O modelo do Azure Resource Manager
Criar um serviço vinculado ao Snowflake usando a interface do usuário
Use as etapas a seguir para criar um serviço vinculado ao Snowflake na interface do usuário do portal do Azure.
Navegue até a guia Gerenciar em seu espaço de trabalho do Azure Data Factory ou Synapse e selecione Serviços Vinculados e clique em Novo:
Procure por Snowflake e selecione o conector Snowflake.
Configure os detalhes do serviço, teste a conexão e crie o novo serviço vinculado.
Detalhes de configuração do conector
As seções a seguir fornecem detalhes sobre propriedades que definem entidades específicas para um conector Snowflake.
Propriedades do serviço vinculado
Estas propriedades genéricas são suportadas para o serviço vinculado Snowflake:
Property | Descrição | Obrigatório |
---|---|---|
tipo | A propriedade type deve ser definida como SnowflakeV2. | Sim |
accountIdentifier | O nome da conta juntamente com a sua organização. Por exemplo, myorg-account123. | Sim |
base de dados | O banco de dados padrão usado para a sessão após a conexão. | Sim |
armazém | O armazém virtual padrão usado para a sessão após a conexão. | Sim |
authenticationType | Tipo de autenticação usado para se conectar ao serviço Snowflake. Os valores permitidos são: Basic (Default) e KeyPair. Consulte as seções correspondentes abaixo sobre mais propriedades e exemplos, respectivamente. | Não |
função | A função de segurança padrão usada para a sessão após a conexão. | Não |
host | O nome do host da conta Snowflake. Por exemplo: contoso.snowflakecomputing.com . .cn também é suportado. |
Não |
ConecteVia | O tempo de execução de integração que é usado para se conectar ao armazenamento de dados. Você pode usar o tempo de execução de integração do Azure ou um tempo de execução de integração auto-hospedado (se seu armazenamento de dados estiver localizado em uma rede privada). Se não for especificado, ele usará o tempo de execução de integração padrão do Azure. | Não |
Este conector Snowflake suporta os seguintes tipos de autenticação. Consulte as seções correspondentes para obter detalhes.
Autenticação básica
Para usar a autenticação básica , além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:
Property | Descrição | Obrigatório |
---|---|---|
Utilizador | Nome de login para o usuário do Snowflake. | Sim |
password | A senha para o usuário do Snowflake. Marque este campo como um tipo SecureString para armazená-lo com segurança. Você também pode fazer referência a um segredo armazenado no Cofre da Chave do Azure. | Sim |
Exemplo:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "SecureString",
"value": "<password>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Palavra-passe no Cofre de Chaves do Azure:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "Basic",
"user": "<username>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Nota
O mapeamento de fluxos de dados suporta apenas a autenticação básica.
Autenticação de par de chaves
Para usar a autenticação de par de chaves, você precisa configurar e criar um usuário de autenticação de par de chaves no Snowflake consultando Autenticação de par de chaves & Rotação de par de chaves. Depois, anote a chave privada e a senha (opcional), que você usa para definir o serviço vinculado.
Além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:
Property | Descrição | Obrigatório |
---|---|---|
Utilizador | Nome de login para o usuário do Snowflake. | Sim |
chave privada | A chave privada usada para a autenticação do par de chaves. Para garantir que a chave privada seja válida quando enviada para o Azure Data Factory, e considerando que o arquivo privateKey inclui caracteres de nova linha (\n), é essencial formatar corretamente o conteúdo privateKey em sua forma literal de cadeia de caracteres. Esse processo envolve a adição de \n explicitamente a cada nova linha. |
Sim |
privateKeyPassphrase | A frase secreta usada para desencriptar a chave privada, se estiver encriptada. | Não |
Exemplo:
{
"name": "SnowflakeV2LinkedService",
"properties": {
"type": "SnowflakeV2",
"typeProperties": {
"accountIdentifier": "<accountIdentifier>",
"database": "<database>",
"warehouse": "<warehouse>",
"authenticationType": "KeyPair",
"user": "<username>",
"privateKey": {
"type": "SecureString",
"value": "<privateKey>"
},
"privateKeyPassphrase": {
"type": "SecureString",
"value": "<privateKeyPassphrase>"
},
"role": "<role>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Propriedades do conjunto de dados
Para obter uma lista completa de seções e propriedades disponíveis para definir conjuntos de dados, consulte o artigo Conjuntos de dados.
As propriedades a seguir são suportadas para o conjunto de dados Snowflake.
Property | Descrição | Obrigatório |
---|---|---|
tipo | A propriedade type do conjunto de dados deve ser definida como SnowflakeV2Table. | Sim |
esquema | Nome do esquema. Observe que o nome do esquema diferencia maiúsculas de minúsculas. | Não para a fonte, sim para a pia |
tabela | Nome da tabela/vista. Observe que o nome da tabela diferencia maiúsculas de minúsculas. | Não para a fonte, sim para a pia |
Exemplo:
{
"name": "SnowflakeV2Dataset",
"properties": {
"type": "SnowflakeV2Table",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
Propriedades da atividade Copy
Para obter uma lista completa de seções e propriedades disponíveis para definir atividades, consulte o artigo Pipelines . Esta seção fornece uma lista de propriedades suportadas pela fonte e pelo coletor de flocos de neve.
Floco de neve como fonte
O conector Snowflake utiliza o comando COPY into [location] do Snowflake para obter o melhor desempenho.
Se o armazenamento de dados e o formato do coletor forem suportados nativamente pelo comando Snowflake COPY, você poderá usar a atividade Copiar para copiar diretamente do Snowflake para o sink. Para obter detalhes, consulte Cópia direta do Snowflake. Caso contrário, use a cópia Staged interna do Snowflake.
Para copiar dados do Snowflake, as seguintes propriedades são suportadas na seção Copiar fonte de atividade.
Property | Descrição | Obrigatório |
---|---|---|
tipo | A propriedade type da fonte de atividade Copy deve ser definida como SnowflakeV2Source. | Sim |
query | Especifica a consulta SQL para ler dados do Snowflake. Se os nomes do esquema, da tabela e das colunas contiverem minúsculas, cite o identificador do objeto na consulta, por exemplo. select * from "schema"."myTable" Não há suporte para a execução de procedimento armazenado. |
Não |
exportSettings | Configurações avançadas usadas para recuperar dados do Snowflake. Você pode configurar os suportados pelo comando COPY into que o serviço passará quando você invocar a instrução. | Sim |
Em exportSettings : |
||
tipo | O tipo de comando export, definido como SnowflakeExportCopyCommand. | Sim |
storageIntegração | Especifique o nome da integração de armazenamento que você criou no Snowflake. Para obter as etapas de pré-requisito de uso da integração de armazenamento, consulte Configurando uma integração de armazenamento Snowflake. | Não |
additionalCopyOptions | Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: MAX_FILE_SIZE, OVERWRITE. Para obter mais informações, consulte Opções de cópia do floco de neve. | Não |
additionalFormatOptions | Opções de formato de arquivo adicionais que são fornecidas ao comando COPY como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT TIMESTAMP_FORMAT. Para obter mais informações, consulte Opções de tipo de formato Snowflake. | Não |
Nota
Verifique se você tem permissão para executar o seguinte comando e acessar o esquema INFORMATION_SCHEMA e as colunas da tabela.
COPY INTO <location>
Cópia direta de Snowflake
Se o armazenamento de dados e o formato do coletor atenderem aos critérios descritos nesta seção, você poderá usar a atividade Copiar para copiar diretamente do Snowflake para o coletor. O serviço verifica as configurações e falha na execução da atividade Copiar se os seguintes critérios não forem atendidos:
Quando você especifica
storageIntegration
na fonte:O repositório de dados do coletor é o Armazenamento de Blobs do Azure que você mencionou no estágio externo no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:
Crie um serviço vinculado de Armazenamento de Blob do Azure para o coletor Armazenamento de Blobs do Azure com qualquer tipo de autenticação suportado.
Conceda pelo menos a função de Colaborador de Dados de Blob de Armazenamento à entidade de serviço do Snowflake no coletor do Azure Blob Storage Access Control (IAM).
Quando você não especifica
storageIntegration
na fonte:O serviço vinculado ao coletor é o armazenamento de Blob do Azure com autenticação de assinatura de acesso compartilhado. Se quiser copiar dados diretamente para o Azure Data Lake Storage Gen2 no seguinte formato com suporte, você pode criar um serviço vinculado do Armazenamento de Blobs do Azure com autenticação SAS em sua conta do Azure Data Lake Storage Gen2, para evitar o uso de cópia em estágios do Snowflake.
O formato de dados do coletor é Parquet, texto delimitado ou JSON com as seguintes configurações:
- Para o formato Parquet , o codec de compressão é None, Snappy ou Lzo.
- Para o formato de texto delimitado:
rowDelimiter
é \r\n, ou qualquer caractere único.compression
pode ser sem compressão, gzip, bzip2 ou deflate.encodingName
é deixado como padrão ou definido como utf-8.quoteChar
é aspa dupla, aspa simples ou string vazia (sem aspas char).
- Para o formato JSON, a cópia direta suporta apenas o caso de a tabela Snowflake de origem ou o resultado da consulta ter apenas uma coluna e o tipo de dados dessa coluna ser VARIANT, OBJECT ou ARRAY.
compression
pode ser sem compressão, gzip, bzip2 ou deflate.encodingName
é deixado como padrão ou definido como utf-8.filePattern
no coletor de atividade de cópia é deixado como padrão ou definido como setOfObjects.
Na fonte da atividade de cópia,
additionalColumns
não é especificado.O mapeamento de colunas não é especificado.
Exemplo:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
},
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
Cópia encenada de Snowflake
Quando o formato ou armazenamento de dados do coletor não for compatível nativamente com o comando COPY do Snowflake, conforme mencionado na última seção, habilite a cópia em estágios interna usando uma instância provisória de armazenamento de Blob do Azure. O recurso de cópia em estágios também oferece uma melhor taxa de transferência. O serviço exporta dados do Snowflake para o armazenamento de preparo, copia os dados para o coletor e, finalmente, limpa os dados temporários do armazenamento de preparação. Consulte Cópia em etapas para obter detalhes sobre como copiar dados usando preparo.
Para usar esse recurso, crie um serviço vinculado de armazenamento de Blob do Azure que se refira à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as enableStaging
propriedades e stagingSettings
na atividade Copiar.
Quando você especifica
storageIntegration
na origem, o armazenamento de Blob do Azure de preparo provisório deve ser aquele que você mencionou no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado de Armazenamento de Blobs do Azure para ele com qualquer autenticação com suporte e conceda pelo menos a função de Colaborador de Dados de Blob de Armazenamento à entidade de serviço Snowflake no Controle de Acesso de Armazenamento de Blobs (IAM) do Azure de preparação.Quando você não especifica
storageIntegration
na origem, o serviço vinculado de Armazenamento de Blob do Azure de preparo deve usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY. Certifique-se de conceder permissão de acesso adequada ao Snowflake no Armazenamento de Blobs do Azure de preparação. Para saber mais sobre isso, consulte este artigo.
Exemplo:
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeV2Source",
"query": "SELECT * FROM MyTable",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Ao executar uma cópia em estágios do Snowflake, é crucial definir o comportamento de cópia do coletor como mesclar arquivos. Essa configuração garante que todos os arquivos particionados sejam manipulados e mesclados corretamente, evitando o problema em que apenas o último arquivo particionado é copiado.
Exemplo de configuração
{
"type": "Copy",
"source": {
"type": "SnowflakeSource",
"query": "SELECT * FROM my_table"
},
"sink": {
"type": "AzureBlobStorage",
"copyBehavior": "MergeFiles"
}
}
Nota
A falha ao definir o comportamento de cópia do coletor para mesclar arquivos pode resultar em apenas o último arquivo particionado sendo copiado.
Floco de neve como pia
O conector Snowflake utiliza o comando COPY into [table] do Snowflake para obter o melhor desempenho. Ele dá suporte à gravação de dados no Snowflake no Azure.
Se o armazenamento de dados de origem e o formato forem suportados nativamente pelo comando Snowflake COPY, você poderá usar a atividade Copiar para copiar diretamente da origem para o Snowflake. Para obter detalhes, consulte Cópia direta para Snowflake. Caso contrário, use a cópia em etapas interna para o Snowflake.
Para copiar dados para o Snowflake, as seguintes propriedades são suportadas na seção Copiar coletor de atividade.
Property | Descrição | Obrigatório |
---|---|---|
tipo | A propriedade type do coletor de atividade Copy, definida como SnowflakeV2Sink. | Sim |
pré-CopyScript | Especifique uma consulta SQL para que a atividade Copy seja executada antes de gravar dados no Snowflake em cada execução. Use essa propriedade para limpar os dados pré-carregados. | Não |
importSettings | Configurações avançadas usadas para gravar dados no Snowflake. Você pode configurar os suportados pelo comando COPY into que o serviço passará quando você invocar a instrução. | Sim |
Em importSettings : |
||
tipo | O tipo de comando import, definido como SnowflakeImportCopyCommand. | Sim |
storageIntegração | Especifique o nome da integração de armazenamento que você criou no Snowflake. Para obter as etapas de pré-requisito de uso da integração de armazenamento, consulte Configurando uma integração de armazenamento Snowflake. | Não |
additionalCopyOptions | Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Para obter mais informações, consulte Opções de cópia do floco de neve. | Não |
additionalFormatOptions | Opções adicionais de formato de arquivo fornecidas ao comando COPY, fornecidas como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT TIMESTAMP_FORMAT. Para obter mais informações, consulte Opções de tipo de formato Snowflake. | Não |
Nota
Verifique se você tem permissão para executar o seguinte comando e acessar o esquema INFORMATION_SCHEMA e as colunas da tabela.
SELECT CURRENT_REGION()
COPY INTO <table>
SHOW REGIONS
CREATE OR REPLACE STAGE
DROP STAGE
Cópia direta para Snowflake
Se o armazenamento e o formato de dados de origem atenderem aos critérios descritos nesta seção, você poderá usar a atividade Copiar para copiar diretamente da fonte para o Snowflake. O serviço verifica as configurações e falha na execução da atividade Copiar se os seguintes critérios não forem atendidos:
Quando você especifica
storageIntegration
no coletor:O armazenamento de dados de origem é o Armazenamento de Blob do Azure que você mencionou no estágio externo no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:
Crie um serviço vinculado do Armazenamento de Blobs do Azure para o Armazenamento de Blobs do Azure de origem com qualquer tipo de autenticação suportado.
Conceda pelo menos a função de Leitor de Dados de Blob de Armazenamento à entidade de serviço Snowflake no IAM (Controle de Acesso de Armazenamento de Blob) do Azure de origem.
Quando você não especifica
storageIntegration
na pia:O serviço vinculado de origem é o armazenamento de Blob do Azure com autenticação de assinatura de acesso compartilhado. Se quiser copiar dados diretamente do Azure Data Lake Storage Gen2 no seguinte formato com suporte, você pode criar um serviço vinculado do Armazenamento de Blobs do Azure com autenticação SAS em sua conta do Azure Data Lake Storage Gen2, para evitar o uso de cópia em estágios para o Snowflake.
O formato de dados de origem é Parquet, texto delimitado ou JSON com as seguintes configurações:
Para o formato Parquet , o codec de compressão é None, ou Snappy.
Para o formato de texto delimitado:
rowDelimiter
é \r\n, ou qualquer caractere único. Se o delimitador de linha não for "\r\n",firstRowAsHeader
precisará ser false eskipLineCount
não será especificado.compression
pode ser sem compressão, gzip, bzip2 ou deflate.encodingName
é deixado como padrão ou definido como "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".quoteChar
é aspa dupla, aspa simples ou string vazia (sem aspas char).
Para o formato JSON, a cópia direta suporta apenas o caso em que a tabela Snowflake do coletor tem apenas uma coluna e o tipo de dados dessa coluna é VARIANT, OBJECT ou ARRAY.
compression
pode ser sem compressão, gzip, bzip2 ou deflate.encodingName
é deixado como padrão ou definido como utf-8.- O mapeamento de colunas não é especificado.
Na fonte da atividade Copiar:
additionalColumns
não é especificado.- Se a origem for uma pasta,
recursive
será definida como true. prefix
,modifiedDateTimeStart
,modifiedDateTimeEnd
, eenablePartitionDiscovery
não são especificados.
Exemplo:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE"
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD"
},
"storageIntegration": "< Snowflake storage integration name >"
}
}
}
}
]
Cópia encenada para Snowflake
Quando o armazenamento ou formato de dados de origem não for nativamente compatível com o comando Snowflake COPY, conforme mencionado na última seção, habilite a cópia em estágios interna usando uma instância provisória de armazenamento de Blob do Azure. O recurso de cópia em estágios também oferece uma melhor taxa de transferência. O serviço converte automaticamente os dados para atender aos requisitos de formato de dados do Snowflake. Em seguida, ele invoca o comando COPY para carregar dados no Snowflake. Finalmente, ele limpa seus dados temporários do armazenamento de blob. Consulte Cópia em etapas para obter detalhes sobre como copiar dados usando preparo.
Para usar esse recurso, crie um serviço vinculado de armazenamento de Blob do Azure que se refira à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as enableStaging
propriedades e stagingSettings
na atividade Copiar.
Quando você especifica
storageIntegration
no coletor, o armazenamento de Blob do Azure de preparo provisório deve ser aquele que você mencionou no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado do Armazenamento de Blobs do Azure para ele com qualquer autenticação com suporte e conceda pelo menos a função de Leitor de Dados de Blob de Armazenamento à entidade de serviço Snowflake no Controle de Acesso de Armazenamento de Blob (IAM) do Azure em preparação.Quando você não especifica
storageIntegration
no coletor, o serviço vinculado de Armazenamento de Blob do Azure de preparo precisa usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY.
Exemplo:
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeV2Sink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"storageIntegration": "< Snowflake storage integration name >"
}
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
Mapeando propriedades de fluxo de dados
Ao transformar dados em mapeamento de fluxo de dados, você pode ler e gravar em tabelas no Snowflake. Para obter mais informações, consulte a transformação de origem e a transformação de coletor no mapeamento de fluxos de dados. Você pode optar por usar um conjunto de dados Snowflake ou um conjunto de dados embutido como fonte e tipo de coletor.
Transformação da fonte
A tabela abaixo lista as propriedades suportadas pela fonte Snowflake. Você pode editar essas propriedades na guia Opções de origem. O conector utiliza transferência de dados interna Snowflake.
Nome | Descrição | Obrigatório | Valores permitidos | Propriedade do script de fluxo de dados |
---|---|---|---|---|
Tabela | Se você selecionar Tabela como entrada, o fluxo de dados buscará todos os dados da tabela especificada no conjunto de dados Snowflake ou nas opções de origem ao usar o conjunto de dados embutido. | Não | String | (apenas para conjunto de dados embutido) tableName schemaName |
Query | Se você selecionar Consulta como entrada, insira uma consulta para buscar dados do Snowflake. Essa configuração substitui qualquer tabela escolhida no conjunto de dados. Se os nomes do esquema, da tabela e das colunas contiverem minúsculas, cite o identificador do objeto na consulta, por exemplo. select * from "schema"."myTable" |
Não | String | query |
Ativar extração incremental (Pré-visualização) | Use essa opção para informar ao ADF para processar somente linhas que foram alteradas desde a última vez que o pipeline foi executado. | Não | Boolean | habilitarCdc |
Coluna incremental | Ao usar o recurso de extração incremental, você deve escolher a data/hora/coluna numérica que deseja usar como marca d'água na tabela de origem. | Não | String | waterMarkColumn |
Ativar o rastreamento de alterações do floco de neve (visualização) | Essa opção permite que o ADF aproveite a tecnologia de captura de dados de alteração do Snowflake para processar apenas os dados delta desde a execução anterior do pipeline. Esta opção carrega automaticamente os dados delta com operações de inserção de linha, atualização e exclusão sem exigir nenhuma coluna incremental. | Não | Boolean | enableNativeCdc |
Alterações líquidas | Ao usar o rastreamento de alterações de flocos de neve, você pode usar essa opção para obter linhas alteradas ou alterações exaustivas. As linhas alteradas com deduped mostrarão apenas as versões mais recentes das linhas que foram alteradas desde um determinado ponto no tempo, enquanto as alterações exaustivas mostrarão todas as versões de cada linha que foi alterada, incluindo as que foram excluídas ou atualizadas. Por exemplo, se você atualizar uma linha, verá uma versão de exclusão e uma versão de inserção em alterações exaustivas, mas apenas a versão de inserção em linhas alteradas dedupedadas. Dependendo do seu caso de uso, você pode escolher a opção que atenda às suas necessidades. A opção padrão é false, o que significa alterações exaustivas. | Não | Boolean | netAlterações |
Incluir colunas do sistema | Ao usar o rastreamento de alterações de flocos de neve, você pode usar a opção systemColumns para controlar se as colunas de fluxo de metadados fornecidas pelo Snowflake são incluídas ou excluídas na saída de controle de alterações. Por padrão, systemColumns é definido como true, o que significa que as colunas de fluxo de metadados estão incluídas. Você pode definir systemColumns como false se quiser excluí-los. | Não | Boolean | systemColumns |
Comece a ler desde o início | Definir essa opção com extração incremental e controle de alterações instruirá o ADF a ler todas as linhas na primeira execução de um pipeline com a extração incremental ativada. | Não | Boolean | skipInitialLoad |
Exemplos de script de origem do Snowflake
Quando você usa o conjunto de dados Snowflake como tipo de origem, o script de fluxo de dados associado é:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
Se você usar o conjunto de dados embutido, o script de fluxo de dados associado será:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
Controlo de alterações nativo
O Azure Data Factory agora dá suporte a um recurso nativo no Snowflake conhecido como controle de alterações, que envolve o controle de alterações na forma de logs. Esse recurso do floco de neve nos permite acompanhar as mudanças nos dados ao longo do tempo, tornando-os úteis para o carregamento incremental de dados e para fins de auditoria. Para utilizar esse recurso, quando você habilita a captura de dados de alteração e seleciona o Rastreamento de alterações de floco de neve, criamos um objeto Stream para a tabela de origem que habilita o controle de alterações na tabela de flocos de neve de origem. Posteriormente, usamos a cláusula CHANGES em nossa consulta para buscar apenas os dados novos ou atualizados da tabela de origem. Além disso, é recomendável agendar o pipeline de modo que as alterações sejam consumidas dentro do intervalo de tempo de retenção de dados definido para a tabela de origem do floco de neve, caso contrário, o usuário poderá ver um comportamento inconsistente nas alterações capturadas.
Transformação do lavatório
A tabela abaixo lista as propriedades suportadas pelo Snowflake sink. Você pode editar essas propriedades na guia Configurações. Ao usar o conjunto de dados embutido, você verá configurações adicionais, que são as mesmas que as propriedades descritas na seção de propriedades do conjunto de dados. O conector utiliza transferência de dados interna Snowflake.
Nome | Descrição | Obrigatório | Valores permitidos | Propriedade do script de fluxo de dados |
---|---|---|---|---|
Método de atualização | Especifique quais operações são permitidas em seu destino Snowflake. Para atualizar, atualizar ou excluir linhas, uma transformação de linha Alter é necessária para marcar linhas para essas ações. |
Sim | true ou false |
suprimido inserível atualizável Atualizável |
Colunas-chave | Para atualizações, upserts e exclusões, uma coluna ou colunas de chave devem ser definidas para determinar qual linha alterar. | Não | Matriz | chaves |
Ação da tabela | Determina se todas as linhas da tabela de destino devem ser recriadas ou removidas antes da gravação. - Nenhuma: Nenhuma ação será feita para a mesa. - Recriar: A tabela será descartada e recriada. Necessário se criar uma nova tabela dinamicamente. - Truncate: Todas as linhas da tabela de destino serão removidas. |
Não | true ou false |
recriar truncate |
Exemplos de script de sumidouro de flocos de neve
Quando você usa o conjunto de dados Snowflake como tipo de coletor, o script de fluxo de dados associado é:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Se você usar o conjunto de dados embutido, o script de fluxo de dados associado será:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
Otimização do Query Pushdown
Ao definir o Nível de Log do pipeline como Nenhum, excluímos a transmissão de métricas de transformação intermediárias, evitando possíveis obstáculos às otimizações do Spark e permitindo a otimização de pushdown de consulta fornecida pelo Snowflake. Essa otimização de pushdown permite melhorias substanciais de desempenho para grandes tabelas Snowflake com conjuntos de dados extensos.
Nota
Não suportamos tabelas temporárias no Snowflake, pois elas são locais para a sessão ou o usuário que as cria, tornando-as inacessíveis a outras sessões e propensas a serem substituídas como tabelas regulares pelo Snowflake. Embora o Snowflake ofereça tabelas transitórias como alternativa, que são acessíveis globalmente, elas exigem exclusão manual, contradizendo nosso objetivo principal de usar tabelas Temp, que é evitar quaisquer operações de exclusão no esquema de origem.
Propriedades da atividade de pesquisa
Para obter mais informações sobre as propriedades, consulte Atividade de pesquisa.
Atualize o conector Snowflake
Para atualizar o conector Snowflake, você pode fazer uma atualização lado a lado ou uma atualização in-loco.
Atualização lado a lado
Para executar uma atualização lado a lado, conclua as seguintes etapas:
- Crie um novo serviço vinculado do Snowflake e configure-o fazendo referência às propriedades do serviço vinculado.
- Crie um conjunto de dados com base no serviço vinculado Snowflake recém-criado.
- Substitua o novo serviço vinculado e o conjunto de dados pelos existentes nos pipelines destinados aos objetos herdados.
Atualização no local
Para executar uma atualização in-loco, você precisa editar a carga útil do serviço vinculado existente e atualizar o conjunto de dados para usar o novo serviço vinculado.
Atualize o tipo de Snowflake para SnowflakeV2.
Modifique a carga útil do serviço vinculado de seu formato herdado para o novo padrão. Você pode preencher cada campo da interface do usuário depois de alterar o tipo mencionado acima ou atualizar a carga diretamente através do Editor JSON. Consulte a seção Propriedades do serviço vinculado neste artigo para obter as propriedades de conexão suportadas. Os exemplos a seguir mostram as diferenças na carga útil para os serviços vinculados Snowflake herdados e novos:
Carga JSON do serviço vinculado Snowflake herdado:
{ "name": "Snowflake1", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "annotations": [], "type": "Snowflake", "typeProperties": { "authenticationType": "Basic", "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC", "encryptedCredential": "<your_encrypted_credential_value>" }, "connectVia": { "referenceName": "AzureIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Nova carga útil JSON do serviço vinculado Snowflake:
{ "name": "Snowflake2", "type": "Microsoft.DataFactory/factories/linkedservices", "properties": { "parameters": { "schema": { "type": "string", "defaultValue": "PUBLIC" } }, "annotations": [], "type": "SnowflakeV2", "typeProperties": { "authenticationType": "Basic", "accountIdentifier": "<FAKE_Account>", "user": "FAKE_USER", "database": "FAKE_DB", "warehouse": "FAKE_DW", "encryptedCredential": "<placeholder>" }, "connectVia": { "referenceName": "AutoResolveIntegrationRuntime", "type": "IntegrationRuntimeReference" } } }
Atualize o conjunto de dados para usar o novo serviço vinculado. Você pode criar um novo conjunto de dados com base no serviço vinculado recém-criado ou atualizar a propriedade type de um conjunto de dados existente de SnowflakeTable para SnowflakeV2Table.
Diferenças entre Floco de Neve e Floco de Neve (legado)
O conector Snowflake oferece novas funcionalidades e é compatível com a maioria dos recursos do conector Snowflake (legado). A tabela abaixo mostra as diferenças de recursos entre Snowflake e Snowflake (legado).
Snowflake | Floco de neve (legado) |
---|---|
Suporta autenticação de par Basic e Key. | Suporte a autenticação básica. |
Atualmente, não há suporte para parâmetros de script na atividade de script. Como alternativa, utilize expressões dinâmicas para parâmetros de script. Para obter mais informações, consulte Expressões e funções no Azure Data Factory e Azure Synapse Analytics. | Suporte a parâmetros de script na atividade de script. |
Suporte BigDecimal na atividade de pesquisa. O tipo NÚMERO, conforme definido em Floco de neve, será exibido como uma cadeia de caracteres na atividade Pesquisa. Se você quiser encobri-lo para o tipo numérico, você pode usar o parâmetro pipeline com função int ou função float. Por exemplo, int(activity('lookup').output.firstRow.VALUE) , float(activity('lookup').output.firstRow.VALUE) |
BigDecimal não é suportado na atividade de pesquisa. |
As accountIdentifier propriedades , warehouse , schema database , e são role usadas para estabelecer uma conexão. |
A connectionstring propriedade é usada para estabelecer uma conexão. |
O tipo de dados de carimbo de data/hora no Snowflake é lido como o tipo de dados DateTimeOffset na atividade Pesquisa e Script. | O tipo de dados de carimbo de data/hora no Snowflake é lido como o tipo de dados DateTime na atividade Pesquisa e Script. Se você ainda precisar usar o valor Datetime como um parâmetro em seu pipeline depois de atualizar o conector, poderá converter o tipo DateTimeOffset para o tipo DateTime usando a função formatDateTime (recomendada) ou a função concat. Por exemplo: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE) , concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z') |
Conteúdos relacionados
Para obter uma lista de armazenamentos de dados suportados como fontes e coletores por atividade de cópia, consulte Armazenamentos de dados e formatos suportados.