Condividi tramite


Lavorare con DataFrame e tables in R

Importante

SparkR in Databricks è deprecato in Databricks Runtime 16.0 e versioni successive. Databricks consiglia di usare sparklyr invece.

Questo articolo descrive come usare pacchetti R come SparkR, sparklyre dplyr per usare R data.frames, i dataframe Sparke in memoria tables.

Si noti che quando si lavora con SparkR, sparklyr e dplyr, è possibile completare un'operazione specifica con tutti questi pacchetti ed è possibile usare il pacchetto più comodo. Ad esempio, per eseguire una query, è possibile chiamare funzioni come SparkR::sql, sparklyr::sdf_sqle dplyr::select. In altri casi, potrebbe essere possibile completare un'operazione con solo uno o due di questi pacchetti e l'operazione scelta dipende dallo scenario di utilizzo. Ad esempio, il modo in cui si chiama sparklyr::sdf_quantile differisce leggermente dal modo in cui si chiama dplyr::percentile_approx, anche se entrambe le funzioni calcolano i quantili.

È possibile usare SQL come bridge tra SparkR e sparklyr. Ad esempio, puoi usare SparkR::sql per interrogare tables che hai creato con sparklyr. È possibile utilizzare sparklyr::sdf_sql per eseguire query su tables che hai creato con SparkR. E il codice dplyr viene sempre tradotto in SQL in memoria prima che venga eseguito. Vedere anche interoperabilità API e traduzione SQL.

Carica SparkR, sparklyr e dplyr

I pacchetti SparkR, sparklyr e dplyr sono inclusi nel runtime di Databricks installato in Azure Databricks cluster. Pertanto, non è necessario chiamare il normale install.package prima di poter iniziare a chiamare questi pacchetti. Tuttavia, è comunque necessario caricare questi pacchetti con library prima. Ad esempio, dall'interno di un notebook R in un'area di lavoro di Azure Databricks, eseguire il codice seguente in una cella del notebook per caricare SparkR, sparklyr e dplyr:

library(SparkR)
library(sparklyr)
library(dplyr)

Connettere sparklyr a un cluster

Dopo aver caricato sparklyr, è necessario chiamare sparklyr::spark_connect per connettersi al cluster, specificando il metodo di connessione databricks. Ad esempio, eseguire il codice seguente in una cella del notebook per connettersi al cluster che ospita il notebook:

sc <- spark_connect(method = "databricks")

Al contrario, un notebook di Azure Databricks stabilisce già un SparkSession nel cluster per l'uso con SparkR, quindi non è necessario chiamare SparkR::sparkR.session prima di poter iniziare a chiamare SparkR.

Caricare un file di dati JSON nell'area di lavoro

Molti esempi di codice di questo articolo si basano su dati situati in una posizione specifica nel tuo ambiente di lavoro di Azure Databricks, con nomi specifici column e tipi di dati definiti. I dati per questo esempio di codice hanno origine in un file JSON denominato book.json dall'interno di GitHub. Per get questo file e caricarlo nell'area di lavoro:

  1. Passare al file books.json in GitHub e usare un editor di testo per copiarne il contenuto in un file denominato books.json da qualche parte nel computer locale.
  2. Nella barra laterale dell'area di lavoro di Azure Databricks fare clic su Catalog.
  3. Fare clic su Crea Table.
  4. Nella scheda Carica file, rilascia il file books.json dal computer locale nella casella Rilascia file per il caricamento. In alternativa, selectfare clic per cercaree selezionare il file books.json dal computer locale.

Per impostazione predefinita, Azure Databricks carica il file locale nel percorso DBFS nell'area di lavoro con il percorso .

Non fare clic su Crea Table con l'interfaccia utente o su Crea Table in Notebook. Gli esempi di codice in questo articolo utilizzano i dati presenti nel file books.json caricato in questa posizione DBFS.

Leggere i dati JSON in un DataFrame

Usare sparklyr::spark_read_json per leggere il file JSON caricato in un dataframe, specificando la connessione, il percorso del file JSON e un nome per la rappresentazione interna table dei dati. Per questo esempio, è necessario specificare che il file book.json contiene più righe. Specificare il columns’ schema è facoltativo. In caso contrario, sparklyr deduce il columns’ schema per impostazione predefinita. Ad esempio, eseguire il codice seguente in una cella del notebook per leggere i dati del file JSON caricato in un dataframe denominato jsonDF:

jsonDF <- spark_read_json(
  sc      = sc,
  name    = "jsonTable",
  path    = "/FileStore/tables/books.json",
  options = list("multiLine" = TRUE),
  columns = c(
    author    = "character",
    country   = "character",
    imageLink = "character",
    language  = "character",
    link      = "character",
    pages     = "integer",
    title     = "character",
    year      = "integer"
  )
)

È possibile usare SparkR::head, SparkR::showo sparklyr::collect per stampare le prime righe di un dataframe. Per impostazione predefinita, head stampa le prime sei righe. show e collect stampano le prime 10 righe. Ad esempio, eseguire il codice seguente in una cella del notebook per stampare le prime righe del dataframe denominato jsonDF:

head(jsonDF)

# Source: spark<?> [?? x 8]
#   author                  country        image…¹ langu…² link  pages title  year
#   <chr>                   <chr>          <chr>   <chr>   <chr> <int> <chr> <int>
# 1 Chinua Achebe           Nigeria        images… English "htt…   209 Thin…  1958
# 2 Hans Christian Andersen Denmark        images… Danish  "htt…   784 Fair…  1836
# 3 Dante Alighieri         Italy          images… Italian "htt…   928 The …  1315
# 4 Unknown                 Sumer and Akk… images… Akkadi… "htt…   160 The … -1700
# 5 Unknown                 Achaemenid Em… images… Hebrew  "htt…   176 The …  -600
# 6 Unknown                 India/Iran/Ir… images… Arabic  "htt…   288 One …  1200
# … with abbreviated variable names ¹​imageLink, ²​language

show(jsonDF)

# Source: spark<jsonTable> [?? x 8]
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

collect(jsonDF)

# A tibble: 100 × 8
#    author                  country       image…¹ langu…² link  pages title  year
#    <chr>                   <chr>         <chr>   <chr>   <chr> <int> <chr> <int>
#  1 Chinua Achebe           Nigeria       images… English "htt…   209 Thin…  1958
#  2 Hans Christian Andersen Denmark       images… Danish  "htt…   784 Fair…  1836
#  3 Dante Alighieri         Italy         images… Italian "htt…   928 The …  1315
#  4 Unknown                 Sumer and Ak… images… Akkadi… "htt…   160 The … -1700
#  5 Unknown                 Achaemenid E… images… Hebrew  "htt…   176 The …  -600
#  6 Unknown                 India/Iran/I… images… Arabic  "htt…   288 One …  1200
#  7 Unknown                 Iceland       images… Old No… "htt…   384 Njál…  1350
#  8 Jane Austen             United Kingd… images… English "htt…   226 Prid…  1813
#  9 Honoré de Balzac        France        images… French  "htt…   443 Le P…  1835
# 10 Samuel Beckett          Republic of … images… French… "htt…   256 Moll…  1952
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Esegui query SQL e scrivi e leggi da un table

È possibile usare le funzioni dplyr per eseguire query SQL in un dataframe. Ad esempio, eseguire il codice seguente in una cella del notebook per usare dplyr::group_by e dployr::count per get conteggi per autore dal dataframe denominato jsonDF. Usare dplyr::arrange e dplyr::desc per ordinare il risultato in ordine decrescente in base ai conteggi. Poi vengono stampate per impostazione predefinita le prime 10 righe.

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n))

# Source:     spark<?> [?? x 2]
# Ordered by: desc(n)
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Gustave Flaubert           2
#  8 Homer                      2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows

È quindi possibile usare sparklyr::spark_write_table per scrivere il risultato in un table in Azure Databricks. Ad esempio, eseguire il codice seguente in una cella del notebook per rieseguire la query e quindi scrivere il risultato in un table denominato json_books_agg:

group_by(jsonDF, author) %>%
  count() %>%
  arrange(desc(n)) %>%
  spark_write_table(
    name = "json_books_agg",
    mode = "overwrite"
  )

Per verificare che il table sia stato creato, è quindi possibile usare sparklyr::sdf_sql insieme a SparkR::showDF per visualizzare i dati del table. Ad esempio, eseguire il codice seguente in una cella del notebook per interrogare il table in un DataFrame e quindi usare sparklyr::collect per stampare per impostazione predefinita le prime 10 righe del DataFrame.

collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

È anche possibile usare sparklyr::spark_read_table per eseguire operazioni simili. Ad esempio, eseguire il codice seguente in una cella del notebook per interrogare il DataFrame precedente chiamato jsonDF in un altro DataFrame e quindi usare sparklyr::collect per stampare per impostazione predefinita le prime 10 righe del DataFrame.

fromTable <- spark_read_table(
  sc   = sc,
  name = "json_books_agg"
)

collect(fromTable)

# A tibble: 82 × 2
#    author                     n
#    <chr>                  <dbl>
#  1 Fyodor Dostoevsky          4
#  2 Unknown                    4
#  3 Leo Tolstoy                3
#  4 Franz Kafka                3
#  5 William Shakespeare        3
#  6 William Faulkner           2
#  7 Homer                      2
#  8 Gustave Flaubert           2
#  9 Gabriel García Márquez     2
# 10 Thomas Mann                2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows

Aggiungere columns e calcolare columnvalues in un DataFrame

È possibile usare le funzioni dplyr per aggiungere columns ai dataframe e per calcolare columns' values.

Ad esempio, esegui il codice seguente in una cella del notebook per get il contenuto del DataFrame denominato jsonDF. Usare dplyr::mutate per aggiungere un column denominato todaye riempire il nuovo column con il timestamp corrente. Scrivere quindi questi contenuti in un nuovo dataframe denominato withDate e usare dplyr::collect per stampare le prime 10 righe del nuovo dataframe per impostazione predefinita.

Nota

dplyr::mutate accetta solo argomenti conformi alle funzioni integrate di Hive (note anche come UDF) e funzioni di aggregazione integrate (note anche come UDAF). Per informazioni generali, vedere Funzioni Hive. Per informazioni sulle funzioni relative alla data in questa sezione, vedere Funzioni data.

withDate <- jsonDF %>%
  mutate(today = current_timestamp())

collect(withDate)

# A tibble: 100 × 9
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:32:59
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:32:59
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:32:59
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:32:59
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:32:59
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:32:59
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:32:59
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:32:59
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows

Usare ora dplyr::mutate per aggiungere altre due columns al contenuto del dataframe withDate. Il nuovo month e il yearcolumns contengono il mese numerico e l'anno del todaycolumn. Scrivere quindi questi contenuti in un nuovo DataFrame denominato withMMyyyye usare dplyr::select insieme a dplyr::collect per stampare author, title, month e yearcolumns delle prime dieci righe del nuovo DataFrame di default:

withMMyyyy <- withDate %>%
  mutate(month = month(today),
         year  = year(today))

collect(select(withMMyyyy, c("author", "title", "month", "year")))

# A tibble: 100 × 4
#    author                  title                                     month  year
#    <chr>                   <chr>                                     <int> <int>
#  1 Chinua Achebe           Things Fall Apart                             9  2022
#  2 Hans Christian Andersen Fairy tales                                   9  2022
#  3 Dante Alighieri         The Divine Comedy                             9  2022
#  4 Unknown                 The Epic Of Gilgamesh                         9  2022
#  5 Unknown                 The Book Of Job                               9  2022
#  6 Unknown                 One Thousand and One Nights                   9  2022
#  7 Unknown                 Njál's Saga                                   9  2022
#  8 Jane Austen             Pride and Prejudice                           9  2022
#  9 Honoré de Balzac        Le Père Goriot                                9  2022
# 10 Samuel Beckett          Molloy, Malone Dies, The Unnamable, the …     9  2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Usare ora dplyr::mutate per aggiungere altre due columns al contenuto del dataframe withMMyyyy. Il nuovo formatted_datecolumns contiene la parte yyyy-MM-dd del todaycolumn, mentre il nuovo daycolumn contiene il giorno numerico del nuovo formatted_datecolumn. Scrivere quindi questi contenuti in un nuovo DataFrame denominato withUnixTimestampe usare dplyr::select insieme a dplyr::collect per stampare il title, formatted_datee daycolumns delle prime dieci righe del nuovo DataFrame di default:

withUnixTimestamp <- withMMyyyy %>%
  mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
         day            = dayofmonth(formatted_date))

collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))

# A tibble: 100 × 3
#    title                                           formatted_date   day
#    <chr>                                           <chr>          <int>
#  1 Things Fall Apart                               2022-09-27        27
#  2 Fairy tales                                     2022-09-27        27
#  3 The Divine Comedy                               2022-09-27        27
#  4 The Epic Of Gilgamesh                           2022-09-27        27
#  5 The Book Of Job                                 2022-09-27        27
#  6 One Thousand and One Nights                     2022-09-27        27
#  7 Njál's Saga                                     2022-09-27        27
#  8 Pride and Prejudice                             2022-09-27        27
#  9 Le Père Goriot                                  2022-09-27        27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27        27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows

Creare una visualizzazione temporanea

È possibile creare views temporanei denominati in memoria basati su dataframe esistenti. Ad esempio, eseguire il codice seguente in una cella del notebook per usare SparkR::createOrReplaceTempView per get il contenuto del dataframe precedente denominato jsonTable e impostarne una visualizzazione temporanea denominata timestampTable. Usare quindi sparklyr::spark_read_table per leggere il contenuto della visualizzazione temporanea. Utilizzare sparklyr::collect per stampare per impostazione predefinita le prime 10 righe del table temporaneo.

createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")

spark_read_table(
  sc = sc,
  name = "timestampTable"
) %>% collect()

# A tibble: 100 × 10
#    author    country image…¹ langu…² link  pages title  year today
#    <chr>     <chr>   <chr>   <chr>   <chr> <int> <chr> <int> <dttm>
#  1 Chinua A… Nigeria images… English "htt…   209 Thin…  1958 2022-09-27 21:11:56
#  2 Hans Chr… Denmark images… Danish  "htt…   784 Fair…  1836 2022-09-27 21:11:56
#  3 Dante Al… Italy   images… Italian "htt…   928 The …  1315 2022-09-27 21:11:56
#  4 Unknown   Sumer … images… Akkadi… "htt…   160 The … -1700 2022-09-27 21:11:56
#  5 Unknown   Achaem… images… Hebrew  "htt…   176 The …  -600 2022-09-27 21:11:56
#  6 Unknown   India/… images… Arabic  "htt…   288 One …  1200 2022-09-27 21:11:56
#  7 Unknown   Iceland images… Old No… "htt…   384 Njál…  1350 2022-09-27 21:11:56
#  8 Jane Aus… United… images… English "htt…   226 Prid…  1813 2022-09-27 21:11:56
#  9 Honoré d… France  images… French  "htt…   443 Le P…  1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt…   256 Moll…  1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
#   names ¹​imageLink, ²​language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names

Eseguire un'analisi statistica su un dataframe

È possibile usare sparklyr insieme a dplyr per le analisi statistiche.

Ad esempio, creare un dataframe in cui eseguire le statistiche. A tale scopo, eseguire il codice seguente in una cella del notebook per usare sparklyr::sdf_copy_to per scrivere il contenuto del set di dati iris integrato in R in un dataframe denominato iris. Utilizzare sparklyr::sdf_collect per stampare per default le prime 10 righe del table temporaneo.

irisDF <- sdf_copy_to(
  sc        = sc,
  x         = iris,
  name      = "iris",
  overwrite = TRUE
)

sdf_collect(irisDF, "row-wise")

# A tibble: 150 × 5
#    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
#           <dbl>       <dbl>        <dbl>       <dbl> <chr>
#  1          5.1         3.5          1.4         0.2 setosa
#  2          4.9         3            1.4         0.2 setosa
#  3          4.7         3.2          1.3         0.2 setosa
#  4          4.6         3.1          1.5         0.2 setosa
#  5          5           3.6          1.4         0.2 setosa
#  6          5.4         3.9          1.7         0.4 setosa
#  7          4.6         3.4          1.4         0.3 setosa
#  8          5           3.4          1.5         0.2 setosa
#  9          4.4         2.9          1.4         0.2 setosa
# 10          4.9         3.1          1.5         0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows

Usare ora dplyr::group_by per raggruppare le righe in base al Speciescolumn. Usare dplyr::summarize insieme a dplyr::percentile_approx per calcolare le statistiche di riepilogo in base al 25°, 50°, 75° e 100° quantile del Sepal_Lengthcolumn da Species. Usare sparklyr::collect per stampare i risultati:

Nota

dplyr::summarize accetta solo argomenti conformi alle funzioni incorporate di Hive (note anche come UDFs) e funzioni di aggregazione incorporate (note anche come UDAFs). Per informazioni generali, vedere Funzioni Hive. Per informazioni su percentile_approx, vedere funzioni di aggregazione integrate (UDAF).

quantileDF <- irisDF %>%
  group_by(Species) %>%
  summarize(
    quantile_25th = percentile_approx(
      Sepal_Length,
      0.25
    ),
    quantile_50th = percentile_approx(
      Sepal_Length,
      0.50
    ),
    quantile_75th = percentile_approx(
      Sepal_Length,
      0.75
    ),
    quantile_100th = percentile_approx(
      Sepal_Length,
      1.0
    )
  )

collect(quantileDF)

# A tibble: 3 × 5
#   Species    quantile_25th quantile_50th quantile_75th quantile_100th
#   <chr>              <dbl>         <dbl>         <dbl>          <dbl>
# 1 virginica            6.2           6.5           6.9            7.9
# 2 versicolor           5.6           5.9           6.3            7
# 3 setosa               4.8           5             5.2            5.8

È possibile calcolare risultati simili, ad esempio usando sparklyr::sdf_quantile:

print(sdf_quantile(
  x = irisDF %>%
    filter(Species == "virginica"),
  column = "Sepal_Length",
  probabilities = c(0.25, 0.5, 0.75, 1.0)
))

# 25%  50%  75% 100%
# 6.2  6.5  6.9  7.9