Partilhar via


Tutorial: Carregar e transformar dados usando o Apache Spark DataFrames

Este tutorial mostra como carregar e transformar dados usando a API DataFrame do Apache Spark Python (PySpark), a API do Apache Spark Scala DataFrame e a API SparkR SparkDataFrame no Azure Databricks.

Ao final deste tutorial, você entenderá o que é um DataFrame e estará familiarizado com as seguintes tarefas:

Python

Consulte também Referência da API do Apache Spark PySpark.

Scala

Consulte também Referência da API do Apache Spark Scala.

R

Consulte também Referência da API do Apache SparkR.

O que é um DataFrame?

Um DataFrame é uma estrutura de dados rotulada bidimensional com columns de tipos potencialmente diferentes. Você pode pensar em um DataFrame como uma planilha, um tableSQL ou um dicionário de objetos de série. O Apache Spark DataFrames fornece uma set rica de funções (selectcolumns, filtro, join, agregação) que permitem resolver problemas comuns de análise de dados de forma eficiente.

Os Apache Spark DataFrames são uma abstração construída sobre conjuntos de dados distribuídos resilientes (RDDs). O Spark DataFrames e o Spark SQL usam um mecanismo unificado de planejamento e otimização, permitindo que você get desempenho quase idêntico em todas as linguagens com suporte no Azure Databricks (Python, SQL, Scala e R).

Requisitos

Para concluir o tutorial a seguir, você deve atender aos seguintes requisitos:

  • Para usar os exemplos neste tutorial, seu espaço de trabalho deve ter Unity Catalog habilitado.

  • Os exemplos neste tutorial usam um volume Catalogdo Unity para armazenar dados de exemplo. Para usar estes exemplos, crie um volume e utilize os catalog, schemae os nomes desse volume para set o caminho de volume usado pelos exemplos.

  • Você deve ter as seguintes permissões no Unity Catalog:

    • READ VOLUME e WRITE VOLUME, ou ALL PRIVILEGES para o volume usado para este tutorial.
    • USE SCHEMA ou ALL PRIVILEGES para o schema usado para este tutorial.
    • USE CATALOG ou ALL PRIVILEGES para o catalog usado para este tutorial.

    Para set essas permissões, consulte o administrador do Databricks ou Unity Catalog privilégios e objetos securizáveis.

Gorjeta

Para obter um bloco de anotações concluído para este artigo, consulte Blocos de anotações de tutorial do DataFrame.

Etapa 1: Definir variáveis e carregar arquivo CSV

Esta etapa define variáveis para uso neste tutorial e, em seguida, carrega um arquivo CSV contendo dados de nome de bebê de health.data.ny.gov em seu volume de Catalog Unity.

  1. Abra um novo bloco de notas clicando no Novo ícone ícone. Para saber como navegar nos blocos de anotações do Azure Databricks, consulte Personalizar a aparência do bloco de anotações.

  2. Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Substitua <catalog-name>, <schema-name>e <volume-name> pelos nomes catalog, schemae nomes de volume para um volume Unity Catalog. Substitua <table_name> por um nome table de sua escolha. Você carregará os dados do nome do bebê neste table mais tarde neste tutorial.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_table = catalog + "." + schema
    print(path_table) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val fileName = "rows.csv"
    val tableName = "<table_name>"
    val pathVolume = s"/Volumes/$catalog/$schema/$volume"
    val pathTable = s"$catalog.$schema"
    print(pathVolume) // Show the complete path
    print(pathTable) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_table <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_table) # Show the complete path
    
  3. Pressione Shift+Enter para executar a célula e criar uma nova célula em branco.

  4. Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código copia o arquivo rows.csv do health.data.ny.gov para o volume do Unity Catalog usando o comando Databricks dbutuils.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
    

    Scala

    dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    
  5. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Etapa 2: Criar um DataFrame

Esta etapa cria um DataFrame nomeado df1 com dados de teste e, em seguida, exibe seu conteúdo.

  1. Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código cria o DataFrame com dados de teste e, em seguida, exibe o conteúdo e o schema do DataFrame.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = as.integer(c(2021)),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = as.integer(c(42))
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Etapa 3: Carregar dados em um DataFrame a partir do arquivo CSV

Esta etapa cria um DataFrame chamado df_csv a partir do arquivo CSV que você carregou anteriormente no volume do Unity Catalog. Ver spark.read.csv.

  1. Copie e cole o código a seguir na nova célula vazia do bloco de anotações. Esse código carrega dados de nome de bebê em DataFrame df_csv a partir do arquivo CSV e, em seguida, exibe o conteúdo do DataFrame.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
        header=True,
        inferSchema=True,
        sep=",")
    display(df_csv)
    

    Scala

    val dfCsv = spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .option("delimiter", ",")
        .csv(s"$pathVolume/$fileName")
    
    display(dfCsv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
        source="csv",
        header = TRUE,
        inferSchema = TRUE,
        delimiter = ",")
    
    display(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Você pode carregar dados de muitos formatos de arquivo suportados.

Etapa 4: exibir e interagir com seu DataFrame

Visualize e interaja com os nomes do seu bebé DataFrames utilizando os seguintes métodos.

Saiba como exibir o schema de um DataFrame Apache Spark. O Apache Spark usa o termo schema para se referir aos nomes e tipos de dados do columns no DataFrame.

Nota

O Azure Databricks também usa o termo schema para descrever uma coleção de tables registados num catalog.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Este código mostra o schema dos seus DataFrames com o método .printSchema() para exibir os esquemas dos dois DataFrames - para preparar a união dos dois DataFrames.

    Python

    df_csv.printSchema()
    df1.printSchema()
    

    Scala

    dfCsv.printSchema()
    df1.printSchema()
    

    R

    printSchema(df_csv)
    printSchema(df1)
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Renomear column no DataFrame

Saiba como renomear um column em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código renomeia um column no df1_csv DataFrame para corresponder aos respetivos column no df1 DataFrame. Este código usa o método Apache Spark withColumnRenamed() .

    Python

    df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
    df_csv.printSchema
    

    Scala

    val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
    // when modifying a DataFrame in Scala, you must assign it to a new variable
    dfCsvRenamed.printSchema()
    

    R

    df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
    printSchema(df_csv)
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Combinar DataFrames

Saiba como criar um novo DataFrame que adiciona as linhas de um DataFrame a outro.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark union() para combinar o conteúdo do seu primeiro DataFrame df com DataFrame df_csv contendo os dados de nomes de bebês carregados do arquivo CSV.

    Python

    df = df1.union(df_csv)
    display(df)
    

    Scala

    val df = df1.union(dfCsvRenamed)
    display(df)
    

    R

    display(df <- union(df1, df_csv))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Filtrar linhas em um DataFrame

Descubra os nomes de bebés mais populares nos dados da sua set através da filtragem de linhas, utilizando os métodos Apache Spark .filter() ou .where(). Use a filtragem para select um subconjunto de linhas para retornar ou modificar em um DataFrame. Não há diferença no desempenho ou na sintaxe, como visto nos exemplos a seguir.

Usando o método .filter()

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark .filter() para exibir essas linhas no DataFrame com uma contagem de mais de 50.

    Python
    display(df.filter(df["Count"] > 50))
    
    Scala
    display(df.filter(df("Count") > 50))
    
    R
    display(filteredDF <- filter(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Usando o método .where()

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark .where() para exibir essas linhas no DataFrame com uma contagem de mais de 50.

    Python
    display(df.where(df["Count"] > 50))
    
    Scala
    display(df.where(df("Count") > 50))
    
    R
    display(filtered_df <- where(df, df$Count > 50))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Select columns a partir de um DataFrame e ordem por frequência

Saiba mais sobre a frequência dos nomes de bebés utilizando o método select() para especificar o(s) columns do DataFrame a devolver. Use o Apache Spark orderby e desc as funções para ordenar os resultados.

O módulo pyspark.sql para Apache Spark fornece suporte para funções SQL. Entre essas funções que usamos neste tutorial estão o Apache Spark orderBy(), desc()e expr() funções. Você habilita o uso dessas funções importando-as para sua sessão, conforme necessário.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Este código importa a desc() função e, em seguida, usa o método Apache Spark select() e Apache Spark orderBy() e desc() funções para exibir os nomes mais comuns e suas contagens em ordem decrescente.

    Python

    from pyspark.sql.functions import desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    Scala

    import org.apache.spark.sql.functions.desc
    display(df.select("First_Name", "Count").orderBy(desc("Count")))
    

    R

    display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Criar um subconjunto DataFrame

Saiba como criar um subconjunto DataFrame a partir de um DataFrame existente.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark filter para criar um novo DataFrame restringindo os dados por ano, contagem e sexo. Ele usa o método Apache Spark select() para limit o columns. Ele também usa o Apache Spark orderBy() e desc() funções para classificar o novo DataFrame por contagem.

    Python

    subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    display(subsetDF)
    

    Scala

    val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
    
    display(subsetDF)
    

    R

    subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
    display(subsetDF)
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Etapa 5: salvar o DataFrame

Saiba como salvar um DataFrame,. Você pode salvar seu DataFrame em um table ou gravar o DataFrame em um arquivo ou em vários arquivos.

Salve o DataFrame em um table

O Azure Databricks usa o formato Delta Lake para todos os tables por padrão. Para salvar seu DataFrame, você deve ter privilégios de CREATEtable no catalog e schema.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código salva o conteúdo do DataFrame em um table usando a variável definida no início deste tutorial.

    Python

    df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
    

    Scala

    df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
    

    R

    saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

A maioria dos aplicativos Apache Spark funciona em grandes conjuntos de dados e de forma distribuída. O Apache Spark grava um diretório de arquivos em vez de um único arquivo. Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. O Azure Databricks recomenda o uso de tables sobre caminhos de arquivo para a maioria dos aplicativos.

Salve o DataFrame em arquivos JSON

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código salva o DataFrame em um diretório de arquivos JSON.

    Python

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    Scala

    df.write.format("json").mode("overwrite").save("/tmp/json_data")
    

    R

    write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Ler o DataFrame de um arquivo JSON

Saiba como usar o método Apache Spark spark.read.format() para ler dados JSON de um diretório em um DataFrame.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código exibe os arquivos JSON salvos no exemplo anterior.

    Python

    display(spark.read.format("json").json("/tmp/json_data"))
    

    Scala

    display(spark.read.format("json").json("/tmp/json_data"))
    

    R

    display(read.json("/tmp/json_data"))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Tarefas adicionais: Executar consultas SQL no PySpark, Scala e R

O Apache Spark DataFrames fornece as seguintes opções para combinar SQL com PySpark, Scala e R. Você pode executar o código a seguir no mesmo bloco de anotações que você criou para este tutorial.

Especificar um column como uma consulta SQL

Saiba como usar o método Apache Spark selectExpr() . Esta é uma variante do select() método que aceita expressões SQL e retorna um DataFrame atualizado. Esse método permite que você use uma expressão SQL, como upper.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa o método Apache Spark selectExpr() e a expressão SQL upper para converter uma cadeia de caracteres column em maiúsculas (e renomear o column).

    Python

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    Scala

    display(df.selectExpr("Count", "upper(County) as big_name"))
    

    R

    display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Use expr() para usar a sintaxe SQL para um column

Saiba como importar e usar a função Apache Spark expr() para usar a sintaxe SQL em qualquer lugar em que um column seja especificado.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código importa a função expr() e, em seguida, usa a função Apache Spark expr() e a expressão SQL lower para converter a cadeia de caracteres column para minúsculas (e renomear a column).

    Python

    from pyspark.sql.functions import expr
    display(df.select("Count", expr("lower(County) as little_name")))
    

    Scala

    import org.apache.spark.sql.functions.{col, expr}
    // Scala requires us to import the col() function as well as the expr() function
    
    display(df.select(col("Count"), expr("lower(County) as little_name")))
    

    R

    display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
    # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Executar uma consulta SQL arbitrária usando a função spark.sql()

Saiba como usar a função Apache Spark spark.sql() para executar consultas SQL arbitrárias.

  1. Copie e cole o código a seguir em uma célula vazia do bloco de anotações. Esse código usa a função Apache Spark spark.sql() para consultar um table SQL usando sintaxe SQL.

    Python

    display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
    

    Scala

    display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
    

    R

    display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
    
  2. Pressione Shift+Enter para executar a célula e, em seguida, vá para a próxima célula.

Blocos de anotações de tutorial DataFrame

Os blocos de anotações a seguir incluem as consultas de exemplos deste tutorial.

Python

Tutorial DataFrames usando Python

Get caderno

Scala

Tutorial DataFrames usando Scala

Get caderno

R

Tutorial de DataFrames usando R

Get caderno

Recursos adicionais